/ tests / test_mcp_tools_analysis.py
test_mcp_tools_analysis.py
  1  """MCP tool tests for analysis.anomalies.check.
  2  
  3  Locks in that the tool is registered on the aggregate MCP server, its
  4  schema rejects malformed input, and the handler correctly composes
  5  ``baseline_from_history`` with ``detect_anomalies`` over STATE.json.
  6  Path sandboxing mirrors rollback.apply — ``state_file`` must resolve
  7  inside CWD.
  8  """
  9  
 10  from __future__ import annotations
 11  
 12  import json
 13  from pathlib import Path
 14  
 15  import pytest
 16  
 17  from mureo.context.models import ActionLogEntry, StateDocument
 18  from mureo.context.state import write_state_file
 19  from mureo.mcp.server import handle_list_tools
 20  from mureo.mcp.tools_analysis import TOOLS, handle_tool
 21  
 22  
 23  def _write_state(path: Path, entries: list[ActionLogEntry]) -> None:
 24      write_state_file(path, StateDocument(version="2", action_log=tuple(entries)))
 25  
 26  
 27  def _history_entry(
 28      *,
 29      cost: float = 10_000,
 30      impressions: int = 50_000,
 31      clicks: int = 500,
 32      conversions: float = 50,
 33      cpa: float | None = None,
 34      ctr: float | None = None,
 35      timestamp: str = "2026-04-08",
 36      campaign_id: str = "C1",
 37  ) -> ActionLogEntry:
 38      metrics: dict[str, float] = {
 39          "cost": cost,
 40          "impressions": impressions,
 41          "clicks": clicks,
 42          "conversions": conversions,
 43      }
 44      if cpa is not None:
 45          metrics["cpa"] = cpa
 46      if ctr is not None:
 47          metrics["ctr"] = ctr
 48      return ActionLogEntry(
 49          timestamp=timestamp,
 50          action="adjust_bid",
 51          platform="google_ads",
 52          campaign_id=campaign_id,
 53          metrics_at_action=metrics,
 54      )
 55  
 56  
 57  @pytest.fixture
 58  def sandboxed_cwd(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
 59      monkeypatch.chdir(tmp_path)
 60      return tmp_path
 61  
 62  
 63  @pytest.mark.unit
 64  class TestToolRegistration:
 65      @pytest.mark.asyncio
 66      async def test_registered_in_server(self) -> None:
 67          tools = await handle_list_tools()
 68          names = {t.name for t in tools}
 69          assert "analysis.anomalies.check" in names
 70  
 71      def test_schema_requires_current(self) -> None:
 72          tool = next(t for t in TOOLS if t.name == "analysis.anomalies.check")
 73          schema = tool.inputSchema
 74          assert "current" in schema["required"]
 75          current_props = schema["properties"]["current"]["properties"]
 76          assert "campaign_id" in current_props
 77  
 78  
 79  @pytest.mark.unit
 80  class TestAnomalyHandler:
 81      @pytest.mark.asyncio
 82      async def test_zero_spend_detected_without_history(
 83          self, sandboxed_cwd: Path
 84      ) -> None:
 85          """With no STATE.json, zero-spend still fires as CRITICAL."""
 86          result = await handle_tool(
 87              "analysis.anomalies.check",
 88              {
 89                  "current": {
 90                      "campaign_id": "C1",
 91                      "cost": 0,
 92                      "impressions": 0,
 93                      "clicks": 0,
 94                      "conversions": 0,
 95                  },
 96                  "had_prior_spend": True,
 97              },
 98          )
 99          payload = json.loads(result[0].text)
100          assert payload["campaign_id"] == "C1"
101          assert payload["baseline"] is None
102          severities = {a["severity"] for a in payload["anomalies"]}
103          assert "critical" in severities
104          metrics = {a["metric"] for a in payload["anomalies"]}
105          assert "cost" in metrics
106  
107      @pytest.mark.asyncio
108      async def test_cpa_spike_detected_with_history(self, sandboxed_cwd: Path) -> None:
109          """CPA spike of 2.3x the historical median fires CRITICAL."""
110          entries = [
111              _history_entry(cost=10_000, conversions=50, cpa=5000) for _ in range(7)
112          ]
113          _write_state(sandboxed_cwd / "STATE.json", entries)
114  
115          result = await handle_tool(
116              "analysis.anomalies.check",
117              {
118                  "current": {
119                      "campaign_id": "C1",
120                      "cost": 30_000,
121                      "impressions": 50_000,
122                      "clicks": 500,
123                      "conversions": 60,
124                      "cpa": 11500,
125                  },
126                  "had_prior_spend": True,
127              },
128          )
129          payload = json.loads(result[0].text)
130          assert payload["baseline"] is not None
131          cpa_anomaly = next(
132              a for a in payload["anomalies"] if a["metric"] == "cpa"
133          )
134          assert cpa_anomaly["severity"] == "critical"
135  
136      @pytest.mark.asyncio
137      async def test_insufficient_history_no_baseline(
138          self, sandboxed_cwd: Path
139      ) -> None:
140          """Below min_baseline_entries, baseline is None and CPA is not evaluated."""
141          _write_state(
142              sandboxed_cwd / "STATE.json",
143              [_history_entry(cost=10_000, conversions=50, cpa=5000)],
144          )
145          result = await handle_tool(
146              "analysis.anomalies.check",
147              {
148                  "current": {
149                      "campaign_id": "C1",
150                      "cost": 30_000,
151                      "impressions": 50_000,
152                      "clicks": 500,
153                      "conversions": 60,
154                      "cpa": 11500,
155                  },
156                  "had_prior_spend": True,
157              },
158          )
159          payload = json.loads(result[0].text)
160          assert payload["baseline"] is None
161          assert all(a["metric"] != "cpa" for a in payload["anomalies"])
162  
163      @pytest.mark.asyncio
164      async def test_missing_campaign_id_raises(self, sandboxed_cwd: Path) -> None:
165          with pytest.raises(ValueError, match="campaign_id"):
166              await handle_tool(
167                  "analysis.anomalies.check",
168                  {"current": {"cost": 0}},
169              )
170  
171      @pytest.mark.asyncio
172      async def test_missing_current_raises(self, sandboxed_cwd: Path) -> None:
173          with pytest.raises(ValueError, match="current"):
174              await handle_tool("analysis.anomalies.check", {})
175  
176      @pytest.mark.asyncio
177      async def test_path_traversal_refused(self, sandboxed_cwd: Path) -> None:
178          outside = sandboxed_cwd.parent / "rogue_STATE.json"
179          _write_state(outside, [])
180          result = await handle_tool(
181              "analysis.anomalies.check",
182              {
183                  "current": {"campaign_id": "C1", "cost": 0},
184                  "state_file": str(outside),
185              },
186          )
187          payload = json.loads(result[0].text)
188          assert (
189              "error" in payload
190              and "current working directory" in payload["error"]
191          )
192  
193      @pytest.mark.asyncio
194      async def test_ctr_drop_detected_with_history(self, sandboxed_cwd: Path) -> None:
195          """CTR drop to 0.3x baseline fires CRITICAL."""
196          entries = [
197              _history_entry(
198                  cost=10_000,
199                  impressions=50_000,
200                  clicks=500,
201                  ctr=0.01,
202                  conversions=50,
203              )
204              for _ in range(7)
205          ]
206          _write_state(sandboxed_cwd / "STATE.json", entries)
207  
208          result = await handle_tool(
209              "analysis.anomalies.check",
210              {
211                  "current": {
212                      "campaign_id": "C1",
213                      "cost": 10_000,
214                      "impressions": 50_000,
215                      "clicks": 100,  # 0.2% CTR = 0.2× baseline
216                      "conversions": 50,
217                      "ctr": 0.002,
218                  },
219                  "had_prior_spend": True,
220              },
221          )
222          payload = json.loads(result[0].text)
223          ctr_anomaly = next(a for a in payload["anomalies"] if a["metric"] == "ctr")
224          assert ctr_anomaly["severity"] == "critical"
225  
226      @pytest.mark.asyncio
227      async def test_had_prior_spend_false_suppresses_zero_spend(
228          self, sandboxed_cwd: Path
229      ) -> None:
230          """Fresh campaigns never trigger zero-spend alerts."""
231          result = await handle_tool(
232              "analysis.anomalies.check",
233              {
234                  "current": {"campaign_id": "C_NEW", "cost": 0},
235                  "had_prior_spend": False,
236              },
237          )
238          payload = json.loads(result[0].text)
239          assert payload["anomalies"] == []
240  
241      @pytest.mark.asyncio
242      async def test_symlink_state_file_refused(self, sandboxed_cwd: Path) -> None:
243          """A symlink inside CWD pointing outside must be refused even if the
244          resolved target happens to also live under CWD (defense in depth:
245          swapping the symlink target is agent-writable)."""
246          outside = sandboxed_cwd.parent / "rogue_STATE.json"
247          _write_state(outside, [])
248          link = sandboxed_cwd / "STATE.json"
249          link.symlink_to(outside)
250  
251          result = await handle_tool(
252              "analysis.anomalies.check",
253              {"current": {"campaign_id": "C1", "cost": 0}, "state_file": "STATE.json"},
254          )
255          payload = json.loads(result[0].text)
256          assert "error" in payload
257  
258      @pytest.mark.asyncio
259      async def test_min_baseline_entries_zero_refused(
260          self, sandboxed_cwd: Path
261      ) -> None:
262          with pytest.raises(ValueError, match="min_baseline_entries"):
263              await handle_tool(
264                  "analysis.anomalies.check",
265                  {
266                      "current": {"campaign_id": "C1", "cost": 0},
267                      "min_baseline_entries": 0,
268                  },
269              )
270  
271      @pytest.mark.asyncio
272      async def test_malformed_metrics_row_tolerated(
273          self, sandboxed_cwd: Path
274      ) -> None:
275          """A history entry with string-typed / N/A metrics must not break detection."""
276          entries: list[ActionLogEntry] = [
277              ActionLogEntry(
278                  timestamp=f"2026-04-0{i}",
279                  action="adjust_bid",
280                  platform="google_ads",
281                  campaign_id="C1",
282                  metrics_at_action={
283                      "cost": "N/A" if i == 0 else 10_000,
284                      "conversions": 50,
285                      "cpa": 5000,
286                      "impressions": 50_000,
287                      "clicks": 500,
288                      "ctr": 0.01,
289                  },
290              )
291              for i in range(8)
292          ]
293          _write_state(sandboxed_cwd / "STATE.json", entries)
294  
295          result = await handle_tool(
296              "analysis.anomalies.check",
297              {
298                  "current": {
299                      "campaign_id": "C1",
300                      "cost": 10_000,
301                      "impressions": 50_000,
302                      "clicks": 500,
303                      "conversions": 50,
304                  },
305                  "had_prior_spend": True,
306              },
307          )
308          payload = json.loads(result[0].text)
309          assert payload["baseline"] is not None