/ tests / gateway / test_feishu_comment_rules.py
test_feishu_comment_rules.py
  1  """Tests for feishu_comment_rules — 3-tier access control rule engine."""
  2  
  3  import json
  4  import os
  5  import tempfile
  6  import time
  7  import unittest
  8  from pathlib import Path
  9  from unittest.mock import patch
 10  
 11  from gateway.platforms.feishu_comment_rules import (
 12      CommentsConfig,
 13      CommentDocumentRule,
 14      ResolvedCommentRule,
 15      _MtimeCache,
 16      _parse_document_rule,
 17      has_wiki_keys,
 18      is_user_allowed,
 19      load_config,
 20      pairing_add,
 21      pairing_list,
 22      pairing_remove,
 23      resolve_rule,
 24  )
 25  
 26  
 27  class TestCommentDocumentRuleParsing(unittest.TestCase):
 28      def test_parse_full_rule(self):
 29          rule = _parse_document_rule({
 30              "enabled": False,
 31              "policy": "allowlist",
 32              "allow_from": ["ou_a", "ou_b"],
 33          })
 34          self.assertFalse(rule.enabled)
 35          self.assertEqual(rule.policy, "allowlist")
 36          self.assertEqual(rule.allow_from, frozenset(["ou_a", "ou_b"]))
 37  
 38      def test_parse_partial_rule(self):
 39          rule = _parse_document_rule({"policy": "allowlist"})
 40          self.assertIsNone(rule.enabled)
 41          self.assertEqual(rule.policy, "allowlist")
 42          self.assertIsNone(rule.allow_from)
 43  
 44      def test_parse_empty_rule(self):
 45          rule = _parse_document_rule({})
 46          self.assertIsNone(rule.enabled)
 47          self.assertIsNone(rule.policy)
 48          self.assertIsNone(rule.allow_from)
 49  
 50      def test_invalid_policy_ignored(self):
 51          rule = _parse_document_rule({"policy": "invalid_value"})
 52          self.assertIsNone(rule.policy)
 53  
 54  
 55  class TestResolveRule(unittest.TestCase):
 56      def test_exact_match(self):
 57          cfg = CommentsConfig(
 58              policy="pairing",
 59              allow_from=frozenset(["ou_top"]),
 60              documents={
 61                  "docx:abc": CommentDocumentRule(policy="allowlist"),
 62              },
 63          )
 64          rule = resolve_rule(cfg, "docx", "abc")
 65          self.assertEqual(rule.policy, "allowlist")
 66          self.assertTrue(rule.match_source.startswith("exact:"))
 67  
 68      def test_wildcard_match(self):
 69          cfg = CommentsConfig(
 70              policy="pairing",
 71              documents={
 72                  "*": CommentDocumentRule(policy="allowlist"),
 73              },
 74          )
 75          rule = resolve_rule(cfg, "docx", "unknown")
 76          self.assertEqual(rule.policy, "allowlist")
 77          self.assertEqual(rule.match_source, "wildcard")
 78  
 79      def test_top_level_fallback(self):
 80          cfg = CommentsConfig(policy="pairing", allow_from=frozenset(["ou_top"]))
 81          rule = resolve_rule(cfg, "docx", "whatever")
 82          self.assertEqual(rule.policy, "pairing")
 83          self.assertEqual(rule.allow_from, frozenset(["ou_top"]))
 84          self.assertEqual(rule.match_source, "top")
 85  
 86      def test_exact_overrides_wildcard(self):
 87          cfg = CommentsConfig(
 88              policy="pairing",
 89              documents={
 90                  "*": CommentDocumentRule(policy="pairing"),
 91                  "docx:abc": CommentDocumentRule(policy="allowlist"),
 92              },
 93          )
 94          rule = resolve_rule(cfg, "docx", "abc")
 95          self.assertEqual(rule.policy, "allowlist")
 96          self.assertTrue(rule.match_source.startswith("exact:"))
 97  
 98      def test_field_by_field_fallback(self):
 99          """Exact sets policy, wildcard sets allow_from, enabled from top."""
100          cfg = CommentsConfig(
101              enabled=True,
102              policy="pairing",
103              allow_from=frozenset(["ou_top"]),
104              documents={
105                  "*": CommentDocumentRule(allow_from=frozenset(["ou_wildcard"])),
106                  "docx:abc": CommentDocumentRule(policy="allowlist"),
107              },
108          )
109          rule = resolve_rule(cfg, "docx", "abc")
110          self.assertEqual(rule.policy, "allowlist")
111          self.assertEqual(rule.allow_from, frozenset(["ou_wildcard"]))
112          self.assertTrue(rule.enabled)
113  
114      def test_explicit_empty_allow_from_does_not_fall_through(self):
115          """allow_from=[] on exact should NOT inherit from wildcard or top."""
116          cfg = CommentsConfig(
117              allow_from=frozenset(["ou_top"]),
118              documents={
119                  "*": CommentDocumentRule(allow_from=frozenset(["ou_wildcard"])),
120                  "docx:abc": CommentDocumentRule(
121                      policy="allowlist",
122                      allow_from=frozenset(),
123                  ),
124              },
125          )
126          rule = resolve_rule(cfg, "docx", "abc")
127          self.assertEqual(rule.allow_from, frozenset())
128  
129      def test_wiki_token_match(self):
130          cfg = CommentsConfig(
131              policy="pairing",
132              documents={
133                  "wiki:WIKI123": CommentDocumentRule(policy="allowlist"),
134              },
135          )
136          rule = resolve_rule(cfg, "docx", "obj_token", wiki_token="WIKI123")
137          self.assertEqual(rule.policy, "allowlist")
138          self.assertTrue(rule.match_source.startswith("exact:wiki:"))
139  
140      def test_exact_takes_priority_over_wiki(self):
141          cfg = CommentsConfig(
142              documents={
143                  "docx:abc": CommentDocumentRule(policy="allowlist"),
144                  "wiki:WIKI123": CommentDocumentRule(policy="pairing"),
145              },
146          )
147          rule = resolve_rule(cfg, "docx", "abc", wiki_token="WIKI123")
148          self.assertEqual(rule.policy, "allowlist")
149          self.assertTrue(rule.match_source.startswith("exact:docx:"))
150  
151      def test_default_config(self):
152          cfg = CommentsConfig()
153          rule = resolve_rule(cfg, "docx", "anything")
154          self.assertTrue(rule.enabled)
155          self.assertEqual(rule.policy, "pairing")
156          self.assertEqual(rule.allow_from, frozenset())
157  
158  
159  class TestHasWikiKeys(unittest.TestCase):
160      def test_no_wiki_keys(self):
161          cfg = CommentsConfig(documents={
162              "docx:abc": CommentDocumentRule(policy="allowlist"),
163              "*": CommentDocumentRule(policy="pairing"),
164          })
165          self.assertFalse(has_wiki_keys(cfg))
166  
167      def test_has_wiki_keys(self):
168          cfg = CommentsConfig(documents={
169              "wiki:WIKI123": CommentDocumentRule(policy="allowlist"),
170          })
171          self.assertTrue(has_wiki_keys(cfg))
172  
173      def test_empty_documents(self):
174          cfg = CommentsConfig()
175          self.assertFalse(has_wiki_keys(cfg))
176  
177  
178  class TestIsUserAllowed(unittest.TestCase):
179      def test_allowlist_allows_listed(self):
180          rule = ResolvedCommentRule(True, "allowlist", frozenset(["ou_a"]), "top")
181          self.assertTrue(is_user_allowed(rule, "ou_a"))
182  
183      def test_allowlist_denies_unlisted(self):
184          rule = ResolvedCommentRule(True, "allowlist", frozenset(["ou_a"]), "top")
185          self.assertFalse(is_user_allowed(rule, "ou_b"))
186  
187      def test_allowlist_empty_denies_all(self):
188          rule = ResolvedCommentRule(True, "allowlist", frozenset(), "top")
189          self.assertFalse(is_user_allowed(rule, "ou_anyone"))
190  
191      def test_pairing_allows_in_allow_from(self):
192          rule = ResolvedCommentRule(True, "pairing", frozenset(["ou_a"]), "top")
193          self.assertTrue(is_user_allowed(rule, "ou_a"))
194  
195      def test_pairing_checks_store(self):
196          rule = ResolvedCommentRule(True, "pairing", frozenset(), "top")
197          with patch(
198              "gateway.platforms.feishu_comment_rules._load_pairing_approved",
199              return_value={"ou_approved"},
200          ):
201              self.assertTrue(is_user_allowed(rule, "ou_approved"))
202              self.assertFalse(is_user_allowed(rule, "ou_unknown"))
203  
204  
205  class TestMtimeCache(unittest.TestCase):
206      def test_returns_empty_dict_for_missing_file(self):
207          cache = _MtimeCache(Path("/nonexistent/path.json"))
208          self.assertEqual(cache.load(), {})
209  
210      def test_reads_file_and_caches(self):
211          with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
212              json.dump({"key": "value"}, f)
213              f.flush()
214              path = Path(f.name)
215          try:
216              cache = _MtimeCache(path)
217              data = cache.load()
218              self.assertEqual(data, {"key": "value"})
219              # Second load should use cache (same mtime)
220              data2 = cache.load()
221              self.assertEqual(data2, {"key": "value"})
222          finally:
223              path.unlink()
224  
225      def test_reloads_on_mtime_change(self):
226          with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
227              json.dump({"v": 1}, f)
228              f.flush()
229              path = Path(f.name)
230          try:
231              cache = _MtimeCache(path)
232              self.assertEqual(cache.load(), {"v": 1})
233              # Modify file
234              time.sleep(0.05)
235              with open(path, "w") as f2:
236                  json.dump({"v": 2}, f2)
237              # Force mtime change detection
238              os.utime(path, (time.time() + 1, time.time() + 1))
239              self.assertEqual(cache.load(), {"v": 2})
240          finally:
241              path.unlink()
242  
243  
244  class TestLoadConfig(unittest.TestCase):
245      def test_load_with_documents(self):
246          raw = {
247              "enabled": True,
248              "policy": "allowlist",
249              "allow_from": ["ou_a"],
250              "documents": {
251                  "*": {"policy": "pairing"},
252                  "docx:abc": {"policy": "allowlist", "allow_from": ["ou_b"]},
253              },
254          }
255          with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
256              json.dump(raw, f)
257              path = Path(f.name)
258          try:
259              with patch("gateway.platforms.feishu_comment_rules.RULES_FILE", path):
260                  with patch("gateway.platforms.feishu_comment_rules._rules_cache", _MtimeCache(path)):
261                      cfg = load_config()
262              self.assertTrue(cfg.enabled)
263              self.assertEqual(cfg.policy, "allowlist")
264              self.assertEqual(cfg.allow_from, frozenset(["ou_a"]))
265              self.assertIn("*", cfg.documents)
266              self.assertIn("docx:abc", cfg.documents)
267              self.assertEqual(cfg.documents["docx:abc"].policy, "allowlist")
268          finally:
269              path.unlink()
270  
271      def test_load_missing_file_returns_defaults(self):
272          with patch("gateway.platforms.feishu_comment_rules._rules_cache", _MtimeCache(Path("/nonexistent"))):
273              cfg = load_config()
274          self.assertTrue(cfg.enabled)
275          self.assertEqual(cfg.policy, "pairing")
276          self.assertEqual(cfg.allow_from, frozenset())
277          self.assertEqual(cfg.documents, {})
278  
279  
280  class TestPairingStore(unittest.TestCase):
281      def setUp(self):
282          self._tmpdir = tempfile.mkdtemp()
283          self._pairing_file = Path(self._tmpdir) / "pairing.json"
284          with open(self._pairing_file, "w") as f:
285              json.dump({"approved": {}}, f)
286          self._patcher_file = patch("gateway.platforms.feishu_comment_rules.PAIRING_FILE", self._pairing_file)
287          self._patcher_cache = patch(
288              "gateway.platforms.feishu_comment_rules._pairing_cache",
289              _MtimeCache(self._pairing_file),
290          )
291          self._patcher_file.start()
292          self._patcher_cache.start()
293  
294      def tearDown(self):
295          self._patcher_cache.stop()
296          self._patcher_file.stop()
297          if self._pairing_file.exists():
298              self._pairing_file.unlink()
299          os.rmdir(self._tmpdir)
300  
301      def test_add_and_list(self):
302          self.assertTrue(pairing_add("ou_new"))
303          approved = pairing_list()
304          self.assertIn("ou_new", approved)
305  
306      def test_add_duplicate(self):
307          pairing_add("ou_a")
308          self.assertFalse(pairing_add("ou_a"))
309  
310      def test_remove(self):
311          pairing_add("ou_a")
312          self.assertTrue(pairing_remove("ou_a"))
313          self.assertNotIn("ou_a", pairing_list())
314  
315      def test_remove_nonexistent(self):
316          self.assertFalse(pairing_remove("ou_nobody"))
317  
318  
319  if __name__ == "__main__":
320      unittest.main()