test_orchestrator.py
1 """ 2 Unit tests for orchestrator module. 3 4 Tests the orchestrator's core logic for evaluating NIP-35 events against rules, 5 including deterministic rules (D-*), probabilistic rules (P-*), and aggregation modes. 6 """ 7 8 import json 9 from unittest.mock import AsyncMock, patch 10 11 import pytest 12 13 from orchestrator import run_rule 14 15 16 @pytest.mark.unit 17 @pytest.mark.asyncio 18 async def test_run_rule_success_deterministic(sample_torrent_event): 19 """Test run_rule with a successful deterministic rule execution.""" 20 event_payload = json.dumps(sample_torrent_event) 21 22 # Mock subprocess to return deterministic result 23 with patch("asyncio.create_subprocess_exec") as mock_exec: 24 mock_proc = AsyncMock() 25 mock_proc.communicate = AsyncMock(return_value=( 26 b'{"passed": true}', 27 b'' 28 )) 29 mock_proc.returncode = 0 30 mock_exec.return_value = mock_proc 31 32 rule, result = await run_rule("D-SCHEMA-03", event_payload) 33 34 assert rule == "D-SCHEMA-03" 35 assert result["ok"] is True 36 assert result["data"]["passed"] is True 37 38 39 @pytest.mark.unit 40 @pytest.mark.asyncio 41 async def test_run_rule_success_probabilistic(sample_torrent_event): 42 """Test run_rule with a successful probabilistic rule execution.""" 43 event_payload = json.dumps(sample_torrent_event) 44 45 with patch("asyncio.create_subprocess_exec") as mock_exec: 46 mock_proc = AsyncMock() 47 mock_proc.communicate = AsyncMock(return_value=( 48 b'{"passed": true, "score": 0.85}', 49 b'' 50 )) 51 mock_proc.returncode = 0 52 mock_exec.return_value = mock_proc 53 54 rule, result = await run_rule("P-QUALITY-01", event_payload) 55 56 assert rule == "P-QUALITY-01" 57 assert result["ok"] is True 58 assert result["data"]["passed"] is True 59 assert result["data"]["score"] == 0.85 60 61 62 @pytest.mark.unit 63 @pytest.mark.asyncio 64 async def test_run_rule_timeout(sample_torrent_event): 65 """Test run_rule handles timeout correctly.""" 66 event_payload = json.dumps(sample_torrent_event) 67 68 with patch("asyncio.create_subprocess_exec") as mock_exec: 69 mock_proc = AsyncMock() 70 # Simulate timeout by never completing 71 mock_proc.communicate = AsyncMock(side_effect=Exception("Should timeout")) 72 mock_proc.kill = AsyncMock() 73 mock_proc.returncode = -9 74 mock_exec.return_value = mock_proc 75 76 # Patch wait_for to raise TimeoutError immediately 77 with patch("asyncio.wait_for", side_effect=TimeoutError): 78 mock_proc.communicate = AsyncMock(return_value=(b'', b'Killed')) 79 rule, result = await run_rule("D-SCHEMA-03", event_payload) 80 81 assert rule == "D-SCHEMA-03" 82 assert result["ok"] is False 83 assert result["error"] == "timeout" 84 assert "timeout_s" in result 85 86 87 @pytest.mark.unit 88 @pytest.mark.asyncio 89 async def test_run_rule_nonzero_exit(sample_torrent_event): 90 """Test run_rule handles non-zero exit code.""" 91 event_payload = json.dumps(sample_torrent_event) 92 93 with patch("asyncio.create_subprocess_exec") as mock_exec: 94 mock_proc = AsyncMock() 95 mock_proc.communicate = AsyncMock(return_value=( 96 b'', 97 b'ImportError: missing module' 98 )) 99 mock_proc.returncode = 1 100 mock_exec.return_value = mock_proc 101 102 rule, result = await run_rule("D-SCHEMA-03", event_payload) 103 104 assert rule == "D-SCHEMA-03" 105 assert result["ok"] is False 106 assert result["error"] == "nonzero_exit" 107 assert result["returncode"] == 1 108 assert "ImportError" in result["stderr"] 109 110 111 @pytest.mark.unit 112 @pytest.mark.asyncio 113 async def test_run_rule_invalid_json_output(sample_torrent_event): 114 """Test run_rule handles invalid JSON in stdout.""" 115 event_payload = json.dumps(sample_torrent_event) 116 117 with patch("asyncio.create_subprocess_exec") as mock_exec: 118 mock_proc = AsyncMock() 119 mock_proc.communicate = AsyncMock(return_value=( 120 b'NOT VALID JSON', 121 b'' 122 )) 123 mock_proc.returncode = 0 124 mock_exec.return_value = mock_proc 125 126 rule, result = await run_rule("D-SCHEMA-03", event_payload) 127 128 assert rule == "D-SCHEMA-03" 129 assert result["ok"] is False 130 assert result["error"] == "invalid_json_stdout" 131 assert "NOT VALID JSON" in result["stdout"] 132 133 134 @pytest.mark.unit 135 @pytest.mark.asyncio 136 async def test_run_rule_empty_stdout(sample_torrent_event): 137 """Test run_rule handles empty stdout (no output from rule).""" 138 event_payload = json.dumps(sample_torrent_event) 139 140 with patch("asyncio.create_subprocess_exec") as mock_exec: 141 mock_proc = AsyncMock() 142 mock_proc.communicate = AsyncMock(return_value=( 143 b'', 144 b'' 145 )) 146 mock_proc.returncode = 0 147 mock_exec.return_value = mock_proc 148 149 rule, result = await run_rule("D-SCHEMA-03", event_payload) 150 151 assert rule == "D-SCHEMA-03" 152 assert result["ok"] is True 153 assert result["data"] is None 154 155 156 @pytest.mark.unit 157 @pytest.mark.asyncio 158 async def test_run_rule_with_stderr_warning(sample_torrent_event): 159 """Test run_rule handles stderr output (warning/logging) but still succeeds.""" 160 event_payload = json.dumps(sample_torrent_event) 161 162 with patch("asyncio.create_subprocess_exec") as mock_exec: 163 mock_proc = AsyncMock() 164 mock_proc.communicate = AsyncMock(return_value=( 165 b'{"passed": true}', 166 b'Warning: deprecated function used' 167 )) 168 mock_proc.returncode = 0 169 mock_exec.return_value = mock_proc 170 171 rule, result = await run_rule("D-SCHEMA-03", event_payload) 172 173 assert rule == "D-SCHEMA-03" 174 assert result["ok"] is True 175 assert result["data"]["passed"] is True 176 assert "Warning" in result["stderr"] 177 178 179 @pytest.mark.unit 180 @pytest.mark.parametrize("rule_id,event_fixture,expected_passed", [ 181 ("D-SCHEMA-03", "sample_torrent_event", True), 182 ("D-SCHEMA-03", "sample_invalid_event", False), 183 ("P-QUALITY-01", "sample_torrent_event", True), 184 ]) 185 @pytest.mark.asyncio 186 async def test_run_rule_parametrized(rule_id, event_fixture, expected_passed, request): 187 """Parametrized test for multiple rules and event combinations.""" 188 event = request.getfixturevalue(event_fixture) 189 event_payload = json.dumps(event) 190 191 # Mock based on expected result 192 mock_result = {"passed": expected_passed} 193 if rule_id.startswith("P-"): 194 mock_result["score"] = 0.5 195 196 with patch("asyncio.create_subprocess_exec") as mock_exec: 197 mock_proc = AsyncMock() 198 mock_proc.communicate = AsyncMock(return_value=( 199 json.dumps(mock_result).encode(), 200 b'' 201 )) 202 mock_proc.returncode = 0 203 mock_exec.return_value = mock_proc 204 205 rule, result = await run_rule(rule_id, event_payload) 206 207 assert rule == rule_id 208 assert result["ok"] is True 209 assert result["data"]["passed"] == expected_passed