test_mcp_tools_analysis.py
1 """MCP tool tests for analysis.anomalies.check. 2 3 Locks in that the tool is registered on the aggregate MCP server, its 4 schema rejects malformed input, and the handler correctly composes 5 ``baseline_from_history`` with ``detect_anomalies`` over STATE.json. 6 Path sandboxing mirrors rollback.apply — ``state_file`` must resolve 7 inside CWD. 8 """ 9 10 from __future__ import annotations 11 12 import json 13 from pathlib import Path 14 15 import pytest 16 17 from mureo.context.models import ActionLogEntry, StateDocument 18 from mureo.context.state import write_state_file 19 from mureo.mcp.server import handle_list_tools 20 from mureo.mcp.tools_analysis import TOOLS, handle_tool 21 22 23 def _write_state(path: Path, entries: list[ActionLogEntry]) -> None: 24 write_state_file(path, StateDocument(version="2", action_log=tuple(entries))) 25 26 27 def _history_entry( 28 *, 29 cost: float = 10_000, 30 impressions: int = 50_000, 31 clicks: int = 500, 32 conversions: float = 50, 33 cpa: float | None = None, 34 ctr: float | None = None, 35 timestamp: str = "2026-04-08", 36 campaign_id: str = "C1", 37 ) -> ActionLogEntry: 38 metrics: dict[str, float] = { 39 "cost": cost, 40 "impressions": impressions, 41 "clicks": clicks, 42 "conversions": conversions, 43 } 44 if cpa is not None: 45 metrics["cpa"] = cpa 46 if ctr is not None: 47 metrics["ctr"] = ctr 48 return ActionLogEntry( 49 timestamp=timestamp, 50 action="adjust_bid", 51 platform="google_ads", 52 campaign_id=campaign_id, 53 metrics_at_action=metrics, 54 ) 55 56 57 @pytest.fixture 58 def sandboxed_cwd(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: 59 monkeypatch.chdir(tmp_path) 60 return tmp_path 61 62 63 @pytest.mark.unit 64 class TestToolRegistration: 65 @pytest.mark.asyncio 66 async def test_registered_in_server(self) -> None: 67 tools = await handle_list_tools() 68 names = {t.name for t in tools} 69 assert "analysis.anomalies.check" in names 70 71 def test_schema_requires_current(self) -> None: 72 tool = next(t for t in TOOLS if t.name == "analysis.anomalies.check") 73 schema = tool.inputSchema 74 assert "current" in schema["required"] 75 current_props = schema["properties"]["current"]["properties"] 76 assert "campaign_id" in current_props 77 78 79 @pytest.mark.unit 80 class TestAnomalyHandler: 81 @pytest.mark.asyncio 82 async def test_zero_spend_detected_without_history( 83 self, sandboxed_cwd: Path 84 ) -> None: 85 """With no STATE.json, zero-spend still fires as CRITICAL.""" 86 result = await handle_tool( 87 "analysis.anomalies.check", 88 { 89 "current": { 90 "campaign_id": "C1", 91 "cost": 0, 92 "impressions": 0, 93 "clicks": 0, 94 "conversions": 0, 95 }, 96 "had_prior_spend": True, 97 }, 98 ) 99 payload = json.loads(result[0].text) 100 assert payload["campaign_id"] == "C1" 101 assert payload["baseline"] is None 102 severities = {a["severity"] for a in payload["anomalies"]} 103 assert "critical" in severities 104 metrics = {a["metric"] for a in payload["anomalies"]} 105 assert "cost" in metrics 106 107 @pytest.mark.asyncio 108 async def test_cpa_spike_detected_with_history(self, sandboxed_cwd: Path) -> None: 109 """CPA spike of 2.3x the historical median fires CRITICAL.""" 110 entries = [ 111 _history_entry(cost=10_000, conversions=50, cpa=5000) for _ in range(7) 112 ] 113 _write_state(sandboxed_cwd / "STATE.json", entries) 114 115 result = await handle_tool( 116 "analysis.anomalies.check", 117 { 118 "current": { 119 "campaign_id": "C1", 120 "cost": 30_000, 121 "impressions": 50_000, 122 "clicks": 500, 123 "conversions": 60, 124 "cpa": 11500, 125 }, 126 "had_prior_spend": True, 127 }, 128 ) 129 payload = json.loads(result[0].text) 130 assert payload["baseline"] is not None 131 cpa_anomaly = next( 132 a for a in payload["anomalies"] if a["metric"] == "cpa" 133 ) 134 assert cpa_anomaly["severity"] == "critical" 135 136 @pytest.mark.asyncio 137 async def test_insufficient_history_no_baseline( 138 self, sandboxed_cwd: Path 139 ) -> None: 140 """Below min_baseline_entries, baseline is None and CPA is not evaluated.""" 141 _write_state( 142 sandboxed_cwd / "STATE.json", 143 [_history_entry(cost=10_000, conversions=50, cpa=5000)], 144 ) 145 result = await handle_tool( 146 "analysis.anomalies.check", 147 { 148 "current": { 149 "campaign_id": "C1", 150 "cost": 30_000, 151 "impressions": 50_000, 152 "clicks": 500, 153 "conversions": 60, 154 "cpa": 11500, 155 }, 156 "had_prior_spend": True, 157 }, 158 ) 159 payload = json.loads(result[0].text) 160 assert payload["baseline"] is None 161 assert all(a["metric"] != "cpa" for a in payload["anomalies"]) 162 163 @pytest.mark.asyncio 164 async def test_missing_campaign_id_raises(self, sandboxed_cwd: Path) -> None: 165 with pytest.raises(ValueError, match="campaign_id"): 166 await handle_tool( 167 "analysis.anomalies.check", 168 {"current": {"cost": 0}}, 169 ) 170 171 @pytest.mark.asyncio 172 async def test_missing_current_raises(self, sandboxed_cwd: Path) -> None: 173 with pytest.raises(ValueError, match="current"): 174 await handle_tool("analysis.anomalies.check", {}) 175 176 @pytest.mark.asyncio 177 async def test_path_traversal_refused(self, sandboxed_cwd: Path) -> None: 178 outside = sandboxed_cwd.parent / "rogue_STATE.json" 179 _write_state(outside, []) 180 result = await handle_tool( 181 "analysis.anomalies.check", 182 { 183 "current": {"campaign_id": "C1", "cost": 0}, 184 "state_file": str(outside), 185 }, 186 ) 187 payload = json.loads(result[0].text) 188 assert ( 189 "error" in payload 190 and "current working directory" in payload["error"] 191 ) 192 193 @pytest.mark.asyncio 194 async def test_ctr_drop_detected_with_history(self, sandboxed_cwd: Path) -> None: 195 """CTR drop to 0.3x baseline fires CRITICAL.""" 196 entries = [ 197 _history_entry( 198 cost=10_000, 199 impressions=50_000, 200 clicks=500, 201 ctr=0.01, 202 conversions=50, 203 ) 204 for _ in range(7) 205 ] 206 _write_state(sandboxed_cwd / "STATE.json", entries) 207 208 result = await handle_tool( 209 "analysis.anomalies.check", 210 { 211 "current": { 212 "campaign_id": "C1", 213 "cost": 10_000, 214 "impressions": 50_000, 215 "clicks": 100, # 0.2% CTR = 0.2× baseline 216 "conversions": 50, 217 "ctr": 0.002, 218 }, 219 "had_prior_spend": True, 220 }, 221 ) 222 payload = json.loads(result[0].text) 223 ctr_anomaly = next(a for a in payload["anomalies"] if a["metric"] == "ctr") 224 assert ctr_anomaly["severity"] == "critical" 225 226 @pytest.mark.asyncio 227 async def test_had_prior_spend_false_suppresses_zero_spend( 228 self, sandboxed_cwd: Path 229 ) -> None: 230 """Fresh campaigns never trigger zero-spend alerts.""" 231 result = await handle_tool( 232 "analysis.anomalies.check", 233 { 234 "current": {"campaign_id": "C_NEW", "cost": 0}, 235 "had_prior_spend": False, 236 }, 237 ) 238 payload = json.loads(result[0].text) 239 assert payload["anomalies"] == [] 240 241 @pytest.mark.asyncio 242 async def test_symlink_state_file_refused(self, sandboxed_cwd: Path) -> None: 243 """A symlink inside CWD pointing outside must be refused even if the 244 resolved target happens to also live under CWD (defense in depth: 245 swapping the symlink target is agent-writable).""" 246 outside = sandboxed_cwd.parent / "rogue_STATE.json" 247 _write_state(outside, []) 248 link = sandboxed_cwd / "STATE.json" 249 link.symlink_to(outside) 250 251 result = await handle_tool( 252 "analysis.anomalies.check", 253 {"current": {"campaign_id": "C1", "cost": 0}, "state_file": "STATE.json"}, 254 ) 255 payload = json.loads(result[0].text) 256 assert "error" in payload 257 258 @pytest.mark.asyncio 259 async def test_min_baseline_entries_zero_refused( 260 self, sandboxed_cwd: Path 261 ) -> None: 262 with pytest.raises(ValueError, match="min_baseline_entries"): 263 await handle_tool( 264 "analysis.anomalies.check", 265 { 266 "current": {"campaign_id": "C1", "cost": 0}, 267 "min_baseline_entries": 0, 268 }, 269 ) 270 271 @pytest.mark.asyncio 272 async def test_malformed_metrics_row_tolerated( 273 self, sandboxed_cwd: Path 274 ) -> None: 275 """A history entry with string-typed / N/A metrics must not break detection.""" 276 entries: list[ActionLogEntry] = [ 277 ActionLogEntry( 278 timestamp=f"2026-04-0{i}", 279 action="adjust_bid", 280 platform="google_ads", 281 campaign_id="C1", 282 metrics_at_action={ 283 "cost": "N/A" if i == 0 else 10_000, 284 "conversions": 50, 285 "cpa": 5000, 286 "impressions": 50_000, 287 "clicks": 500, 288 "ctr": 0.01, 289 }, 290 ) 291 for i in range(8) 292 ] 293 _write_state(sandboxed_cwd / "STATE.json", entries) 294 295 result = await handle_tool( 296 "analysis.anomalies.check", 297 { 298 "current": { 299 "campaign_id": "C1", 300 "cost": 10_000, 301 "impressions": 50_000, 302 "clicks": 500, 303 "conversions": 50, 304 }, 305 "had_prior_spend": True, 306 }, 307 ) 308 payload = json.loads(result[0].text) 309 assert payload["baseline"] is not None