test_feishu_comment_rules.py
1 """Tests for feishu_comment_rules — 3-tier access control rule engine.""" 2 3 import json 4 import os 5 import tempfile 6 import time 7 import unittest 8 from pathlib import Path 9 from unittest.mock import patch 10 11 from gateway.platforms.feishu_comment_rules import ( 12 CommentsConfig, 13 CommentDocumentRule, 14 ResolvedCommentRule, 15 _MtimeCache, 16 _parse_document_rule, 17 has_wiki_keys, 18 is_user_allowed, 19 load_config, 20 pairing_add, 21 pairing_list, 22 pairing_remove, 23 resolve_rule, 24 ) 25 26 27 class TestCommentDocumentRuleParsing(unittest.TestCase): 28 def test_parse_full_rule(self): 29 rule = _parse_document_rule({ 30 "enabled": False, 31 "policy": "allowlist", 32 "allow_from": ["ou_a", "ou_b"], 33 }) 34 self.assertFalse(rule.enabled) 35 self.assertEqual(rule.policy, "allowlist") 36 self.assertEqual(rule.allow_from, frozenset(["ou_a", "ou_b"])) 37 38 def test_parse_partial_rule(self): 39 rule = _parse_document_rule({"policy": "allowlist"}) 40 self.assertIsNone(rule.enabled) 41 self.assertEqual(rule.policy, "allowlist") 42 self.assertIsNone(rule.allow_from) 43 44 def test_parse_empty_rule(self): 45 rule = _parse_document_rule({}) 46 self.assertIsNone(rule.enabled) 47 self.assertIsNone(rule.policy) 48 self.assertIsNone(rule.allow_from) 49 50 def test_invalid_policy_ignored(self): 51 rule = _parse_document_rule({"policy": "invalid_value"}) 52 self.assertIsNone(rule.policy) 53 54 55 class TestResolveRule(unittest.TestCase): 56 def test_exact_match(self): 57 cfg = CommentsConfig( 58 policy="pairing", 59 allow_from=frozenset(["ou_top"]), 60 documents={ 61 "docx:abc": CommentDocumentRule(policy="allowlist"), 62 }, 63 ) 64 rule = resolve_rule(cfg, "docx", "abc") 65 self.assertEqual(rule.policy, "allowlist") 66 self.assertTrue(rule.match_source.startswith("exact:")) 67 68 def test_wildcard_match(self): 69 cfg = CommentsConfig( 70 policy="pairing", 71 documents={ 72 "*": CommentDocumentRule(policy="allowlist"), 73 }, 74 ) 75 rule = resolve_rule(cfg, "docx", "unknown") 76 self.assertEqual(rule.policy, "allowlist") 77 self.assertEqual(rule.match_source, "wildcard") 78 79 def test_top_level_fallback(self): 80 cfg = CommentsConfig(policy="pairing", allow_from=frozenset(["ou_top"])) 81 rule = resolve_rule(cfg, "docx", "whatever") 82 self.assertEqual(rule.policy, "pairing") 83 self.assertEqual(rule.allow_from, frozenset(["ou_top"])) 84 self.assertEqual(rule.match_source, "top") 85 86 def test_exact_overrides_wildcard(self): 87 cfg = CommentsConfig( 88 policy="pairing", 89 documents={ 90 "*": CommentDocumentRule(policy="pairing"), 91 "docx:abc": CommentDocumentRule(policy="allowlist"), 92 }, 93 ) 94 rule = resolve_rule(cfg, "docx", "abc") 95 self.assertEqual(rule.policy, "allowlist") 96 self.assertTrue(rule.match_source.startswith("exact:")) 97 98 def test_field_by_field_fallback(self): 99 """Exact sets policy, wildcard sets allow_from, enabled from top.""" 100 cfg = CommentsConfig( 101 enabled=True, 102 policy="pairing", 103 allow_from=frozenset(["ou_top"]), 104 documents={ 105 "*": CommentDocumentRule(allow_from=frozenset(["ou_wildcard"])), 106 "docx:abc": CommentDocumentRule(policy="allowlist"), 107 }, 108 ) 109 rule = resolve_rule(cfg, "docx", "abc") 110 self.assertEqual(rule.policy, "allowlist") 111 self.assertEqual(rule.allow_from, frozenset(["ou_wildcard"])) 112 self.assertTrue(rule.enabled) 113 114 def test_explicit_empty_allow_from_does_not_fall_through(self): 115 """allow_from=[] on exact should NOT inherit from wildcard or top.""" 116 cfg = CommentsConfig( 117 allow_from=frozenset(["ou_top"]), 118 documents={ 119 "*": CommentDocumentRule(allow_from=frozenset(["ou_wildcard"])), 120 "docx:abc": CommentDocumentRule( 121 policy="allowlist", 122 allow_from=frozenset(), 123 ), 124 }, 125 ) 126 rule = resolve_rule(cfg, "docx", "abc") 127 self.assertEqual(rule.allow_from, frozenset()) 128 129 def test_wiki_token_match(self): 130 cfg = CommentsConfig( 131 policy="pairing", 132 documents={ 133 "wiki:WIKI123": CommentDocumentRule(policy="allowlist"), 134 }, 135 ) 136 rule = resolve_rule(cfg, "docx", "obj_token", wiki_token="WIKI123") 137 self.assertEqual(rule.policy, "allowlist") 138 self.assertTrue(rule.match_source.startswith("exact:wiki:")) 139 140 def test_exact_takes_priority_over_wiki(self): 141 cfg = CommentsConfig( 142 documents={ 143 "docx:abc": CommentDocumentRule(policy="allowlist"), 144 "wiki:WIKI123": CommentDocumentRule(policy="pairing"), 145 }, 146 ) 147 rule = resolve_rule(cfg, "docx", "abc", wiki_token="WIKI123") 148 self.assertEqual(rule.policy, "allowlist") 149 self.assertTrue(rule.match_source.startswith("exact:docx:")) 150 151 def test_default_config(self): 152 cfg = CommentsConfig() 153 rule = resolve_rule(cfg, "docx", "anything") 154 self.assertTrue(rule.enabled) 155 self.assertEqual(rule.policy, "pairing") 156 self.assertEqual(rule.allow_from, frozenset()) 157 158 159 class TestHasWikiKeys(unittest.TestCase): 160 def test_no_wiki_keys(self): 161 cfg = CommentsConfig(documents={ 162 "docx:abc": CommentDocumentRule(policy="allowlist"), 163 "*": CommentDocumentRule(policy="pairing"), 164 }) 165 self.assertFalse(has_wiki_keys(cfg)) 166 167 def test_has_wiki_keys(self): 168 cfg = CommentsConfig(documents={ 169 "wiki:WIKI123": CommentDocumentRule(policy="allowlist"), 170 }) 171 self.assertTrue(has_wiki_keys(cfg)) 172 173 def test_empty_documents(self): 174 cfg = CommentsConfig() 175 self.assertFalse(has_wiki_keys(cfg)) 176 177 178 class TestIsUserAllowed(unittest.TestCase): 179 def test_allowlist_allows_listed(self): 180 rule = ResolvedCommentRule(True, "allowlist", frozenset(["ou_a"]), "top") 181 self.assertTrue(is_user_allowed(rule, "ou_a")) 182 183 def test_allowlist_denies_unlisted(self): 184 rule = ResolvedCommentRule(True, "allowlist", frozenset(["ou_a"]), "top") 185 self.assertFalse(is_user_allowed(rule, "ou_b")) 186 187 def test_allowlist_empty_denies_all(self): 188 rule = ResolvedCommentRule(True, "allowlist", frozenset(), "top") 189 self.assertFalse(is_user_allowed(rule, "ou_anyone")) 190 191 def test_pairing_allows_in_allow_from(self): 192 rule = ResolvedCommentRule(True, "pairing", frozenset(["ou_a"]), "top") 193 self.assertTrue(is_user_allowed(rule, "ou_a")) 194 195 def test_pairing_checks_store(self): 196 rule = ResolvedCommentRule(True, "pairing", frozenset(), "top") 197 with patch( 198 "gateway.platforms.feishu_comment_rules._load_pairing_approved", 199 return_value={"ou_approved"}, 200 ): 201 self.assertTrue(is_user_allowed(rule, "ou_approved")) 202 self.assertFalse(is_user_allowed(rule, "ou_unknown")) 203 204 205 class TestMtimeCache(unittest.TestCase): 206 def test_returns_empty_dict_for_missing_file(self): 207 cache = _MtimeCache(Path("/nonexistent/path.json")) 208 self.assertEqual(cache.load(), {}) 209 210 def test_reads_file_and_caches(self): 211 with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: 212 json.dump({"key": "value"}, f) 213 f.flush() 214 path = Path(f.name) 215 try: 216 cache = _MtimeCache(path) 217 data = cache.load() 218 self.assertEqual(data, {"key": "value"}) 219 # Second load should use cache (same mtime) 220 data2 = cache.load() 221 self.assertEqual(data2, {"key": "value"}) 222 finally: 223 path.unlink() 224 225 def test_reloads_on_mtime_change(self): 226 with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: 227 json.dump({"v": 1}, f) 228 f.flush() 229 path = Path(f.name) 230 try: 231 cache = _MtimeCache(path) 232 self.assertEqual(cache.load(), {"v": 1}) 233 # Modify file 234 time.sleep(0.05) 235 with open(path, "w") as f2: 236 json.dump({"v": 2}, f2) 237 # Force mtime change detection 238 os.utime(path, (time.time() + 1, time.time() + 1)) 239 self.assertEqual(cache.load(), {"v": 2}) 240 finally: 241 path.unlink() 242 243 244 class TestLoadConfig(unittest.TestCase): 245 def test_load_with_documents(self): 246 raw = { 247 "enabled": True, 248 "policy": "allowlist", 249 "allow_from": ["ou_a"], 250 "documents": { 251 "*": {"policy": "pairing"}, 252 "docx:abc": {"policy": "allowlist", "allow_from": ["ou_b"]}, 253 }, 254 } 255 with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: 256 json.dump(raw, f) 257 path = Path(f.name) 258 try: 259 with patch("gateway.platforms.feishu_comment_rules.RULES_FILE", path): 260 with patch("gateway.platforms.feishu_comment_rules._rules_cache", _MtimeCache(path)): 261 cfg = load_config() 262 self.assertTrue(cfg.enabled) 263 self.assertEqual(cfg.policy, "allowlist") 264 self.assertEqual(cfg.allow_from, frozenset(["ou_a"])) 265 self.assertIn("*", cfg.documents) 266 self.assertIn("docx:abc", cfg.documents) 267 self.assertEqual(cfg.documents["docx:abc"].policy, "allowlist") 268 finally: 269 path.unlink() 270 271 def test_load_missing_file_returns_defaults(self): 272 with patch("gateway.platforms.feishu_comment_rules._rules_cache", _MtimeCache(Path("/nonexistent"))): 273 cfg = load_config() 274 self.assertTrue(cfg.enabled) 275 self.assertEqual(cfg.policy, "pairing") 276 self.assertEqual(cfg.allow_from, frozenset()) 277 self.assertEqual(cfg.documents, {}) 278 279 280 class TestPairingStore(unittest.TestCase): 281 def setUp(self): 282 self._tmpdir = tempfile.mkdtemp() 283 self._pairing_file = Path(self._tmpdir) / "pairing.json" 284 with open(self._pairing_file, "w") as f: 285 json.dump({"approved": {}}, f) 286 self._patcher_file = patch("gateway.platforms.feishu_comment_rules.PAIRING_FILE", self._pairing_file) 287 self._patcher_cache = patch( 288 "gateway.platforms.feishu_comment_rules._pairing_cache", 289 _MtimeCache(self._pairing_file), 290 ) 291 self._patcher_file.start() 292 self._patcher_cache.start() 293 294 def tearDown(self): 295 self._patcher_cache.stop() 296 self._patcher_file.stop() 297 if self._pairing_file.exists(): 298 self._pairing_file.unlink() 299 os.rmdir(self._tmpdir) 300 301 def test_add_and_list(self): 302 self.assertTrue(pairing_add("ou_new")) 303 approved = pairing_list() 304 self.assertIn("ou_new", approved) 305 306 def test_add_duplicate(self): 307 pairing_add("ou_a") 308 self.assertFalse(pairing_add("ou_a")) 309 310 def test_remove(self): 311 pairing_add("ou_a") 312 self.assertTrue(pairing_remove("ou_a")) 313 self.assertNotIn("ou_a", pairing_list()) 314 315 def test_remove_nonexistent(self): 316 self.assertFalse(pairing_remove("ou_nobody")) 317 318 319 if __name__ == "__main__": 320 unittest.main()