test_pairing.py
1 """Tests for gateway/pairing.py — DM pairing security system.""" 2 3 import json 4 import os 5 import time 6 from pathlib import Path 7 from unittest.mock import patch 8 9 from gateway.pairing import ( 10 PairingStore, 11 ALPHABET, 12 CODE_LENGTH, 13 CODE_TTL_SECONDS, 14 RATE_LIMIT_SECONDS, 15 MAX_PENDING_PER_PLATFORM, 16 MAX_FAILED_ATTEMPTS, 17 LOCKOUT_SECONDS, 18 _secure_write, 19 ) 20 21 22 def _make_store(tmp_path): 23 """Create a PairingStore with PAIRING_DIR pointed to tmp_path.""" 24 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 25 return PairingStore() 26 27 28 # --------------------------------------------------------------------------- 29 # _secure_write 30 # --------------------------------------------------------------------------- 31 32 33 class TestSecureWrite: 34 def test_creates_parent_dirs(self, tmp_path): 35 target = tmp_path / "sub" / "dir" / "file.json" 36 _secure_write(target, '{"hello": "world"}') 37 assert target.exists() 38 assert json.loads(target.read_text()) == {"hello": "world"} 39 40 def test_sets_file_permissions(self, tmp_path): 41 target = tmp_path / "secret.json" 42 _secure_write(target, "data") 43 mode = oct(target.stat().st_mode & 0o777) 44 assert mode == "0o600" 45 46 47 # --------------------------------------------------------------------------- 48 # Code generation 49 # --------------------------------------------------------------------------- 50 51 52 class TestCodeGeneration: 53 def test_code_format(self, tmp_path): 54 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 55 store = PairingStore() 56 code = store.generate_code("telegram", "user1", "Alice") 57 assert isinstance(code, str) and len(code) == CODE_LENGTH 58 assert len(code) == CODE_LENGTH 59 assert all(c in ALPHABET for c in code) 60 61 def test_code_uniqueness(self, tmp_path): 62 """Multiple codes for different users should be distinct.""" 63 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 64 store = PairingStore() 65 codes = set() 66 for i in range(3): 67 code = store.generate_code("telegram", f"user{i}") 68 assert isinstance(code, str) and len(code) == CODE_LENGTH 69 codes.add(code) 70 assert len(codes) == 3 71 72 def test_stores_pending_entry(self, tmp_path): 73 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 74 store = PairingStore() 75 code = store.generate_code("telegram", "user1", "Alice") 76 pending = store.list_pending("telegram") 77 assert len(pending) == 1 78 assert pending[0]["code"] == code 79 assert pending[0]["user_id"] == "user1" 80 assert pending[0]["user_name"] == "Alice" 81 82 83 # --------------------------------------------------------------------------- 84 # Rate limiting 85 # --------------------------------------------------------------------------- 86 87 88 class TestRateLimiting: 89 def test_same_user_rate_limited(self, tmp_path): 90 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 91 store = PairingStore() 92 code1 = store.generate_code("telegram", "user1") 93 code2 = store.generate_code("telegram", "user1") 94 assert isinstance(code1, str) and len(code1) == CODE_LENGTH 95 assert code2 is None # rate limited 96 97 def test_different_users_not_rate_limited(self, tmp_path): 98 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 99 store = PairingStore() 100 code1 = store.generate_code("telegram", "user1") 101 code2 = store.generate_code("telegram", "user2") 102 assert isinstance(code1, str) and len(code1) == CODE_LENGTH 103 assert isinstance(code2, str) and len(code2) == CODE_LENGTH 104 105 def test_rate_limit_expires(self, tmp_path): 106 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 107 store = PairingStore() 108 code1 = store.generate_code("telegram", "user1") 109 assert isinstance(code1, str) and len(code1) == CODE_LENGTH 110 111 # Simulate rate limit expiry 112 limits = store._load_json(store._rate_limit_path()) 113 limits["telegram:user1"] = time.time() - RATE_LIMIT_SECONDS - 1 114 store._save_json(store._rate_limit_path(), limits) 115 116 code2 = store.generate_code("telegram", "user1") 117 assert isinstance(code2, str) and len(code2) == CODE_LENGTH 118 assert code2 != code1 119 120 121 # --------------------------------------------------------------------------- 122 # Max pending limit 123 # --------------------------------------------------------------------------- 124 125 126 class TestMaxPending: 127 def test_max_pending_per_platform(self, tmp_path): 128 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 129 store = PairingStore() 130 codes = [] 131 for i in range(MAX_PENDING_PER_PLATFORM + 1): 132 code = store.generate_code("telegram", f"user{i}") 133 codes.append(code) 134 135 # First MAX_PENDING_PER_PLATFORM should succeed 136 assert all(isinstance(c, str) and len(c) == CODE_LENGTH for c in codes[:MAX_PENDING_PER_PLATFORM]) 137 # Next one should be blocked 138 assert codes[MAX_PENDING_PER_PLATFORM] is None 139 140 def test_different_platforms_independent(self, tmp_path): 141 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 142 store = PairingStore() 143 for i in range(MAX_PENDING_PER_PLATFORM): 144 store.generate_code("telegram", f"user{i}") 145 # Different platform should still work 146 code = store.generate_code("discord", "user0") 147 assert isinstance(code, str) and len(code) == CODE_LENGTH 148 149 150 # --------------------------------------------------------------------------- 151 # Approval flow 152 # --------------------------------------------------------------------------- 153 154 155 class TestApprovalFlow: 156 def test_approve_valid_code(self, tmp_path): 157 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 158 store = PairingStore() 159 code = store.generate_code("telegram", "user1", "Alice") 160 result = store.approve_code("telegram", code) 161 162 assert isinstance(result, dict) 163 assert "user_id" in result 164 assert "user_name" in result 165 assert result["user_id"] == "user1" 166 assert result["user_name"] == "Alice" 167 168 def test_approved_user_is_approved(self, tmp_path): 169 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 170 store = PairingStore() 171 code = store.generate_code("telegram", "user1", "Alice") 172 store.approve_code("telegram", code) 173 assert store.is_approved("telegram", "user1") is True 174 175 def test_unapproved_user_not_approved(self, tmp_path): 176 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 177 store = PairingStore() 178 assert store.is_approved("telegram", "nonexistent") is False 179 180 def test_approve_removes_from_pending(self, tmp_path): 181 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 182 store = PairingStore() 183 code = store.generate_code("telegram", "user1") 184 store.approve_code("telegram", code) 185 pending = store.list_pending("telegram") 186 assert len(pending) == 0 187 188 def test_approve_case_insensitive(self, tmp_path): 189 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 190 store = PairingStore() 191 code = store.generate_code("telegram", "user1", "Alice") 192 result = store.approve_code("telegram", code.lower()) 193 assert isinstance(result, dict) 194 assert result["user_id"] == "user1" 195 assert result["user_name"] == "Alice" 196 197 def test_approve_strips_whitespace(self, tmp_path): 198 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 199 store = PairingStore() 200 code = store.generate_code("telegram", "user1", "Alice") 201 result = store.approve_code("telegram", f" {code} ") 202 assert isinstance(result, dict) 203 assert result["user_id"] == "user1" 204 assert result["user_name"] == "Alice" 205 206 def test_invalid_code_returns_none(self, tmp_path): 207 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 208 store = PairingStore() 209 result = store.approve_code("telegram", "INVALIDCODE") 210 assert result is None 211 212 213 # --------------------------------------------------------------------------- 214 # Lockout after failed attempts 215 # --------------------------------------------------------------------------- 216 217 218 class TestLockout: 219 def test_lockout_after_max_failures(self, tmp_path): 220 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 221 store = PairingStore() 222 # Generate a valid code so platform has data 223 store.generate_code("telegram", "user1") 224 225 # Exhaust failed attempts 226 for _ in range(MAX_FAILED_ATTEMPTS): 227 store.approve_code("telegram", "WRONGCODE") 228 229 # Platform should now be locked out — can't generate new codes 230 assert store._is_locked_out("telegram") is True 231 232 def test_lockout_blocks_code_generation(self, tmp_path): 233 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 234 store = PairingStore() 235 for _ in range(MAX_FAILED_ATTEMPTS): 236 store.approve_code("telegram", "WRONG") 237 238 code = store.generate_code("telegram", "newuser") 239 assert code is None 240 241 def test_lockout_expires(self, tmp_path): 242 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 243 store = PairingStore() 244 for _ in range(MAX_FAILED_ATTEMPTS): 245 store.approve_code("telegram", "WRONG") 246 247 # Simulate lockout expiry 248 limits = store._load_json(store._rate_limit_path()) 249 lockout_key = "_lockout:telegram" 250 limits[lockout_key] = time.time() - 1 # expired 251 store._save_json(store._rate_limit_path(), limits) 252 253 assert store._is_locked_out("telegram") is False 254 255 256 # --------------------------------------------------------------------------- 257 # Code expiry 258 # --------------------------------------------------------------------------- 259 260 261 class TestCodeExpiry: 262 def test_expired_codes_cleaned_up(self, tmp_path): 263 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 264 store = PairingStore() 265 code = store.generate_code("telegram", "user1") 266 267 # Manually expire the code 268 pending = store._load_json(store._pending_path("telegram")) 269 pending[code]["created_at"] = time.time() - CODE_TTL_SECONDS - 1 270 store._save_json(store._pending_path("telegram"), pending) 271 272 # Cleanup happens on next operation 273 remaining = store.list_pending("telegram") 274 assert len(remaining) == 0 275 276 def test_expired_code_cannot_be_approved(self, tmp_path): 277 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 278 store = PairingStore() 279 code = store.generate_code("telegram", "user1") 280 281 # Expire it 282 pending = store._load_json(store._pending_path("telegram")) 283 pending[code]["created_at"] = time.time() - CODE_TTL_SECONDS - 1 284 store._save_json(store._pending_path("telegram"), pending) 285 286 result = store.approve_code("telegram", code) 287 assert result is None 288 289 290 # --------------------------------------------------------------------------- 291 # Revoke 292 # --------------------------------------------------------------------------- 293 294 295 class TestRevoke: 296 def test_revoke_approved_user(self, tmp_path): 297 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 298 store = PairingStore() 299 code = store.generate_code("telegram", "user1", "Alice") 300 store.approve_code("telegram", code) 301 assert store.is_approved("telegram", "user1") is True 302 303 revoked = store.revoke("telegram", "user1") 304 assert revoked is True 305 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 306 assert store.is_approved("telegram", "user1") is False 307 308 def test_revoke_nonexistent_returns_false(self, tmp_path): 309 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 310 store = PairingStore() 311 assert store.revoke("telegram", "nobody") is False 312 313 314 # --------------------------------------------------------------------------- 315 # List & clear 316 # --------------------------------------------------------------------------- 317 318 319 class TestListAndClear: 320 def test_list_approved(self, tmp_path): 321 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 322 store = PairingStore() 323 code = store.generate_code("telegram", "user1", "Alice") 324 store.approve_code("telegram", code) 325 approved = store.list_approved("telegram") 326 assert len(approved) == 1 327 assert approved[0]["user_id"] == "user1" 328 assert approved[0]["platform"] == "telegram" 329 330 def test_list_approved_all_platforms(self, tmp_path): 331 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 332 store = PairingStore() 333 c1 = store.generate_code("telegram", "user1") 334 store.approve_code("telegram", c1) 335 c2 = store.generate_code("discord", "user2") 336 store.approve_code("discord", c2) 337 approved = store.list_approved() 338 assert len(approved) == 2 339 340 def test_clear_pending(self, tmp_path): 341 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 342 store = PairingStore() 343 store.generate_code("telegram", "user1") 344 store.generate_code("telegram", "user2") 345 count = store.clear_pending("telegram") 346 remaining = store.list_pending("telegram") 347 assert count == 2 348 assert len(remaining) == 0 349 350 def test_clear_pending_all_platforms(self, tmp_path): 351 with patch("gateway.pairing.PAIRING_DIR", tmp_path): 352 store = PairingStore() 353 store.generate_code("telegram", "user1") 354 store.generate_code("discord", "user2") 355 count = store.clear_pending() 356 assert count == 2