/ tests / gateway / test_pairing.py
test_pairing.py
  1  """Tests for gateway/pairing.py — DM pairing security system."""
  2  
  3  import json
  4  import os
  5  import time
  6  from pathlib import Path
  7  from unittest.mock import patch
  8  
  9  from gateway.pairing import (
 10      PairingStore,
 11      ALPHABET,
 12      CODE_LENGTH,
 13      CODE_TTL_SECONDS,
 14      RATE_LIMIT_SECONDS,
 15      MAX_PENDING_PER_PLATFORM,
 16      MAX_FAILED_ATTEMPTS,
 17      LOCKOUT_SECONDS,
 18      _secure_write,
 19  )
 20  
 21  
 22  def _make_store(tmp_path):
 23      """Create a PairingStore with PAIRING_DIR pointed to tmp_path."""
 24      with patch("gateway.pairing.PAIRING_DIR", tmp_path):
 25          return PairingStore()
 26  
 27  
 28  # ---------------------------------------------------------------------------
 29  # _secure_write
 30  # ---------------------------------------------------------------------------
 31  
 32  
 33  class TestSecureWrite:
 34      def test_creates_parent_dirs(self, tmp_path):
 35          target = tmp_path / "sub" / "dir" / "file.json"
 36          _secure_write(target, '{"hello": "world"}')
 37          assert target.exists()
 38          assert json.loads(target.read_text()) == {"hello": "world"}
 39  
 40      def test_sets_file_permissions(self, tmp_path):
 41          target = tmp_path / "secret.json"
 42          _secure_write(target, "data")
 43          mode = oct(target.stat().st_mode & 0o777)
 44          assert mode == "0o600"
 45  
 46  
 47  # ---------------------------------------------------------------------------
 48  # Code generation
 49  # ---------------------------------------------------------------------------
 50  
 51  
 52  class TestCodeGeneration:
 53      def test_code_format(self, tmp_path):
 54          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
 55              store = PairingStore()
 56              code = store.generate_code("telegram", "user1", "Alice")
 57          assert isinstance(code, str) and len(code) == CODE_LENGTH
 58          assert len(code) == CODE_LENGTH
 59          assert all(c in ALPHABET for c in code)
 60  
 61      def test_code_uniqueness(self, tmp_path):
 62          """Multiple codes for different users should be distinct."""
 63          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
 64              store = PairingStore()
 65              codes = set()
 66              for i in range(3):
 67                  code = store.generate_code("telegram", f"user{i}")
 68                  assert isinstance(code, str) and len(code) == CODE_LENGTH
 69                  codes.add(code)
 70          assert len(codes) == 3
 71  
 72      def test_stores_pending_entry(self, tmp_path):
 73          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
 74              store = PairingStore()
 75              code = store.generate_code("telegram", "user1", "Alice")
 76              pending = store.list_pending("telegram")
 77          assert len(pending) == 1
 78          assert pending[0]["code"] == code
 79          assert pending[0]["user_id"] == "user1"
 80          assert pending[0]["user_name"] == "Alice"
 81  
 82  
 83  # ---------------------------------------------------------------------------
 84  # Rate limiting
 85  # ---------------------------------------------------------------------------
 86  
 87  
 88  class TestRateLimiting:
 89      def test_same_user_rate_limited(self, tmp_path):
 90          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
 91              store = PairingStore()
 92              code1 = store.generate_code("telegram", "user1")
 93              code2 = store.generate_code("telegram", "user1")
 94          assert isinstance(code1, str) and len(code1) == CODE_LENGTH
 95          assert code2 is None  # rate limited
 96  
 97      def test_different_users_not_rate_limited(self, tmp_path):
 98          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
 99              store = PairingStore()
100              code1 = store.generate_code("telegram", "user1")
101              code2 = store.generate_code("telegram", "user2")
102          assert isinstance(code1, str) and len(code1) == CODE_LENGTH
103          assert isinstance(code2, str) and len(code2) == CODE_LENGTH
104  
105      def test_rate_limit_expires(self, tmp_path):
106          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
107              store = PairingStore()
108              code1 = store.generate_code("telegram", "user1")
109              assert isinstance(code1, str) and len(code1) == CODE_LENGTH
110  
111              # Simulate rate limit expiry
112              limits = store._load_json(store._rate_limit_path())
113              limits["telegram:user1"] = time.time() - RATE_LIMIT_SECONDS - 1
114              store._save_json(store._rate_limit_path(), limits)
115  
116              code2 = store.generate_code("telegram", "user1")
117          assert isinstance(code2, str) and len(code2) == CODE_LENGTH
118          assert code2 != code1
119  
120  
121  # ---------------------------------------------------------------------------
122  # Max pending limit
123  # ---------------------------------------------------------------------------
124  
125  
126  class TestMaxPending:
127      def test_max_pending_per_platform(self, tmp_path):
128          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
129              store = PairingStore()
130              codes = []
131              for i in range(MAX_PENDING_PER_PLATFORM + 1):
132                  code = store.generate_code("telegram", f"user{i}")
133                  codes.append(code)
134  
135          # First MAX_PENDING_PER_PLATFORM should succeed
136          assert all(isinstance(c, str) and len(c) == CODE_LENGTH for c in codes[:MAX_PENDING_PER_PLATFORM])
137          # Next one should be blocked
138          assert codes[MAX_PENDING_PER_PLATFORM] is None
139  
140      def test_different_platforms_independent(self, tmp_path):
141          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
142              store = PairingStore()
143              for i in range(MAX_PENDING_PER_PLATFORM):
144                  store.generate_code("telegram", f"user{i}")
145              # Different platform should still work
146              code = store.generate_code("discord", "user0")
147          assert isinstance(code, str) and len(code) == CODE_LENGTH
148  
149  
150  # ---------------------------------------------------------------------------
151  # Approval flow
152  # ---------------------------------------------------------------------------
153  
154  
155  class TestApprovalFlow:
156      def test_approve_valid_code(self, tmp_path):
157          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
158              store = PairingStore()
159              code = store.generate_code("telegram", "user1", "Alice")
160              result = store.approve_code("telegram", code)
161  
162          assert isinstance(result, dict)
163          assert "user_id" in result
164          assert "user_name" in result
165          assert result["user_id"] == "user1"
166          assert result["user_name"] == "Alice"
167  
168      def test_approved_user_is_approved(self, tmp_path):
169          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
170              store = PairingStore()
171              code = store.generate_code("telegram", "user1", "Alice")
172              store.approve_code("telegram", code)
173              assert store.is_approved("telegram", "user1") is True
174  
175      def test_unapproved_user_not_approved(self, tmp_path):
176          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
177              store = PairingStore()
178              assert store.is_approved("telegram", "nonexistent") is False
179  
180      def test_approve_removes_from_pending(self, tmp_path):
181          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
182              store = PairingStore()
183              code = store.generate_code("telegram", "user1")
184              store.approve_code("telegram", code)
185              pending = store.list_pending("telegram")
186          assert len(pending) == 0
187  
188      def test_approve_case_insensitive(self, tmp_path):
189          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
190              store = PairingStore()
191              code = store.generate_code("telegram", "user1", "Alice")
192              result = store.approve_code("telegram", code.lower())
193          assert isinstance(result, dict)
194          assert result["user_id"] == "user1"
195          assert result["user_name"] == "Alice"
196  
197      def test_approve_strips_whitespace(self, tmp_path):
198          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
199              store = PairingStore()
200              code = store.generate_code("telegram", "user1", "Alice")
201              result = store.approve_code("telegram", f"  {code}  ")
202          assert isinstance(result, dict)
203          assert result["user_id"] == "user1"
204          assert result["user_name"] == "Alice"
205  
206      def test_invalid_code_returns_none(self, tmp_path):
207          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
208              store = PairingStore()
209              result = store.approve_code("telegram", "INVALIDCODE")
210          assert result is None
211  
212  
213  # ---------------------------------------------------------------------------
214  # Lockout after failed attempts
215  # ---------------------------------------------------------------------------
216  
217  
218  class TestLockout:
219      def test_lockout_after_max_failures(self, tmp_path):
220          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
221              store = PairingStore()
222              # Generate a valid code so platform has data
223              store.generate_code("telegram", "user1")
224  
225              # Exhaust failed attempts
226              for _ in range(MAX_FAILED_ATTEMPTS):
227                  store.approve_code("telegram", "WRONGCODE")
228  
229              # Platform should now be locked out — can't generate new codes
230              assert store._is_locked_out("telegram") is True
231  
232      def test_lockout_blocks_code_generation(self, tmp_path):
233          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
234              store = PairingStore()
235              for _ in range(MAX_FAILED_ATTEMPTS):
236                  store.approve_code("telegram", "WRONG")
237  
238              code = store.generate_code("telegram", "newuser")
239          assert code is None
240  
241      def test_lockout_expires(self, tmp_path):
242          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
243              store = PairingStore()
244              for _ in range(MAX_FAILED_ATTEMPTS):
245                  store.approve_code("telegram", "WRONG")
246  
247              # Simulate lockout expiry
248              limits = store._load_json(store._rate_limit_path())
249              lockout_key = "_lockout:telegram"
250              limits[lockout_key] = time.time() - 1  # expired
251              store._save_json(store._rate_limit_path(), limits)
252  
253              assert store._is_locked_out("telegram") is False
254  
255  
256  # ---------------------------------------------------------------------------
257  # Code expiry
258  # ---------------------------------------------------------------------------
259  
260  
261  class TestCodeExpiry:
262      def test_expired_codes_cleaned_up(self, tmp_path):
263          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
264              store = PairingStore()
265              code = store.generate_code("telegram", "user1")
266  
267              # Manually expire the code
268              pending = store._load_json(store._pending_path("telegram"))
269              pending[code]["created_at"] = time.time() - CODE_TTL_SECONDS - 1
270              store._save_json(store._pending_path("telegram"), pending)
271  
272              # Cleanup happens on next operation
273              remaining = store.list_pending("telegram")
274          assert len(remaining) == 0
275  
276      def test_expired_code_cannot_be_approved(self, tmp_path):
277          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
278              store = PairingStore()
279              code = store.generate_code("telegram", "user1")
280  
281              # Expire it
282              pending = store._load_json(store._pending_path("telegram"))
283              pending[code]["created_at"] = time.time() - CODE_TTL_SECONDS - 1
284              store._save_json(store._pending_path("telegram"), pending)
285  
286              result = store.approve_code("telegram", code)
287          assert result is None
288  
289  
290  # ---------------------------------------------------------------------------
291  # Revoke
292  # ---------------------------------------------------------------------------
293  
294  
295  class TestRevoke:
296      def test_revoke_approved_user(self, tmp_path):
297          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
298              store = PairingStore()
299              code = store.generate_code("telegram", "user1", "Alice")
300              store.approve_code("telegram", code)
301              assert store.is_approved("telegram", "user1") is True
302  
303              revoked = store.revoke("telegram", "user1")
304          assert revoked is True
305          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
306              assert store.is_approved("telegram", "user1") is False
307  
308      def test_revoke_nonexistent_returns_false(self, tmp_path):
309          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
310              store = PairingStore()
311              assert store.revoke("telegram", "nobody") is False
312  
313  
314  # ---------------------------------------------------------------------------
315  # List & clear
316  # ---------------------------------------------------------------------------
317  
318  
319  class TestListAndClear:
320      def test_list_approved(self, tmp_path):
321          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
322              store = PairingStore()
323              code = store.generate_code("telegram", "user1", "Alice")
324              store.approve_code("telegram", code)
325              approved = store.list_approved("telegram")
326          assert len(approved) == 1
327          assert approved[0]["user_id"] == "user1"
328          assert approved[0]["platform"] == "telegram"
329  
330      def test_list_approved_all_platforms(self, tmp_path):
331          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
332              store = PairingStore()
333              c1 = store.generate_code("telegram", "user1")
334              store.approve_code("telegram", c1)
335              c2 = store.generate_code("discord", "user2")
336              store.approve_code("discord", c2)
337              approved = store.list_approved()
338          assert len(approved) == 2
339  
340      def test_clear_pending(self, tmp_path):
341          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
342              store = PairingStore()
343              store.generate_code("telegram", "user1")
344              store.generate_code("telegram", "user2")
345              count = store.clear_pending("telegram")
346              remaining = store.list_pending("telegram")
347          assert count == 2
348          assert len(remaining) == 0
349  
350      def test_clear_pending_all_platforms(self, tmp_path):
351          with patch("gateway.pairing.PAIRING_DIR", tmp_path):
352              store = PairingStore()
353              store.generate_code("telegram", "user1")
354              store.generate_code("discord", "user2")
355              count = store.clear_pending()
356          assert count == 2