/ tests / unit / test_orchestrator.py
test_orchestrator.py
  1  """
  2  Unit tests for orchestrator module.
  3  
  4  Tests the orchestrator's core logic for evaluating NIP-35 events against rules,
  5  including deterministic rules (D-*), probabilistic rules (P-*), and aggregation modes.
  6  """
  7  
  8  import json
  9  from unittest.mock import AsyncMock, patch
 10  
 11  import pytest
 12  
 13  from orchestrator import run_rule
 14  
 15  
 16  @pytest.mark.unit
 17  @pytest.mark.asyncio
 18  async def test_run_rule_success_deterministic(sample_torrent_event):
 19      """Test run_rule with a successful deterministic rule execution."""
 20      event_payload = json.dumps(sample_torrent_event)
 21  
 22      # Mock subprocess to return deterministic result
 23      with patch("asyncio.create_subprocess_exec") as mock_exec:
 24          mock_proc = AsyncMock()
 25          mock_proc.communicate = AsyncMock(return_value=(
 26              b'{"passed": true}',
 27              b''
 28          ))
 29          mock_proc.returncode = 0
 30          mock_exec.return_value = mock_proc
 31  
 32          rule, result = await run_rule("D-SCHEMA-03", event_payload)
 33  
 34          assert rule == "D-SCHEMA-03"
 35          assert result["ok"] is True
 36          assert result["data"]["passed"] is True
 37  
 38  
 39  @pytest.mark.unit
 40  @pytest.mark.asyncio
 41  async def test_run_rule_success_probabilistic(sample_torrent_event):
 42      """Test run_rule with a successful probabilistic rule execution."""
 43      event_payload = json.dumps(sample_torrent_event)
 44  
 45      with patch("asyncio.create_subprocess_exec") as mock_exec:
 46          mock_proc = AsyncMock()
 47          mock_proc.communicate = AsyncMock(return_value=(
 48              b'{"passed": true, "score": 0.85}',
 49              b''
 50          ))
 51          mock_proc.returncode = 0
 52          mock_exec.return_value = mock_proc
 53  
 54          rule, result = await run_rule("P-QUALITY-01", event_payload)
 55  
 56          assert rule == "P-QUALITY-01"
 57          assert result["ok"] is True
 58          assert result["data"]["passed"] is True
 59          assert result["data"]["score"] == 0.85
 60  
 61  
 62  @pytest.mark.unit
 63  @pytest.mark.asyncio
 64  async def test_run_rule_timeout(sample_torrent_event):
 65      """Test run_rule handles timeout correctly."""
 66      event_payload = json.dumps(sample_torrent_event)
 67  
 68      with patch("asyncio.create_subprocess_exec") as mock_exec:
 69          mock_proc = AsyncMock()
 70          # Simulate timeout by never completing
 71          mock_proc.communicate = AsyncMock(side_effect=Exception("Should timeout"))
 72          mock_proc.kill = AsyncMock()
 73          mock_proc.returncode = -9
 74          mock_exec.return_value = mock_proc
 75  
 76          # Patch wait_for to raise TimeoutError immediately
 77          with patch("asyncio.wait_for", side_effect=TimeoutError):
 78              mock_proc.communicate = AsyncMock(return_value=(b'', b'Killed'))
 79              rule, result = await run_rule("D-SCHEMA-03", event_payload)
 80  
 81              assert rule == "D-SCHEMA-03"
 82              assert result["ok"] is False
 83              assert result["error"] == "timeout"
 84              assert "timeout_s" in result
 85  
 86  
 87  @pytest.mark.unit
 88  @pytest.mark.asyncio
 89  async def test_run_rule_nonzero_exit(sample_torrent_event):
 90      """Test run_rule handles non-zero exit code."""
 91      event_payload = json.dumps(sample_torrent_event)
 92  
 93      with patch("asyncio.create_subprocess_exec") as mock_exec:
 94          mock_proc = AsyncMock()
 95          mock_proc.communicate = AsyncMock(return_value=(
 96              b'',
 97              b'ImportError: missing module'
 98          ))
 99          mock_proc.returncode = 1
100          mock_exec.return_value = mock_proc
101  
102          rule, result = await run_rule("D-SCHEMA-03", event_payload)
103  
104          assert rule == "D-SCHEMA-03"
105          assert result["ok"] is False
106          assert result["error"] == "nonzero_exit"
107          assert result["returncode"] == 1
108          assert "ImportError" in result["stderr"]
109  
110  
111  @pytest.mark.unit
112  @pytest.mark.asyncio
113  async def test_run_rule_invalid_json_output(sample_torrent_event):
114      """Test run_rule handles invalid JSON in stdout."""
115      event_payload = json.dumps(sample_torrent_event)
116  
117      with patch("asyncio.create_subprocess_exec") as mock_exec:
118          mock_proc = AsyncMock()
119          mock_proc.communicate = AsyncMock(return_value=(
120              b'NOT VALID JSON',
121              b''
122          ))
123          mock_proc.returncode = 0
124          mock_exec.return_value = mock_proc
125  
126          rule, result = await run_rule("D-SCHEMA-03", event_payload)
127  
128          assert rule == "D-SCHEMA-03"
129          assert result["ok"] is False
130          assert result["error"] == "invalid_json_stdout"
131          assert "NOT VALID JSON" in result["stdout"]
132  
133  
134  @pytest.mark.unit
135  @pytest.mark.asyncio
136  async def test_run_rule_empty_stdout(sample_torrent_event):
137      """Test run_rule handles empty stdout (no output from rule)."""
138      event_payload = json.dumps(sample_torrent_event)
139  
140      with patch("asyncio.create_subprocess_exec") as mock_exec:
141          mock_proc = AsyncMock()
142          mock_proc.communicate = AsyncMock(return_value=(
143              b'',
144              b''
145          ))
146          mock_proc.returncode = 0
147          mock_exec.return_value = mock_proc
148  
149          rule, result = await run_rule("D-SCHEMA-03", event_payload)
150  
151          assert rule == "D-SCHEMA-03"
152          assert result["ok"] is True
153          assert result["data"] is None
154  
155  
156  @pytest.mark.unit
157  @pytest.mark.asyncio
158  async def test_run_rule_with_stderr_warning(sample_torrent_event):
159      """Test run_rule handles stderr output (warning/logging) but still succeeds."""
160      event_payload = json.dumps(sample_torrent_event)
161  
162      with patch("asyncio.create_subprocess_exec") as mock_exec:
163          mock_proc = AsyncMock()
164          mock_proc.communicate = AsyncMock(return_value=(
165              b'{"passed": true}',
166              b'Warning: deprecated function used'
167          ))
168          mock_proc.returncode = 0
169          mock_exec.return_value = mock_proc
170  
171          rule, result = await run_rule("D-SCHEMA-03", event_payload)
172  
173          assert rule == "D-SCHEMA-03"
174          assert result["ok"] is True
175          assert result["data"]["passed"] is True
176          assert "Warning" in result["stderr"]
177  
178  
179  @pytest.mark.unit
180  @pytest.mark.parametrize("rule_id,event_fixture,expected_passed", [
181      ("D-SCHEMA-03", "sample_torrent_event", True),
182      ("D-SCHEMA-03", "sample_invalid_event", False),
183      ("P-QUALITY-01", "sample_torrent_event", True),
184  ])
185  @pytest.mark.asyncio
186  async def test_run_rule_parametrized(rule_id, event_fixture, expected_passed, request):
187      """Parametrized test for multiple rules and event combinations."""
188      event = request.getfixturevalue(event_fixture)
189      event_payload = json.dumps(event)
190  
191      # Mock based on expected result
192      mock_result = {"passed": expected_passed}
193      if rule_id.startswith("P-"):
194          mock_result["score"] = 0.5
195  
196      with patch("asyncio.create_subprocess_exec") as mock_exec:
197          mock_proc = AsyncMock()
198          mock_proc.communicate = AsyncMock(return_value=(
199              json.dumps(mock_result).encode(),
200              b''
201          ))
202          mock_proc.returncode = 0
203          mock_exec.return_value = mock_proc
204  
205          rule, result = await run_rule(rule_id, event_payload)
206  
207          assert rule == rule_id
208          assert result["ok"] is True
209          assert result["data"]["passed"] == expected_passed