/ tests / test_rollback_execute.py
test_rollback_execute.py
  1  """Rollback executor tests.
  2  
  3  Covers the safety contract of ``execute_rollback``: the function that
  4  loads STATE.json, re-plans an entry, re-dispatches the reversal
  5  through the MCP handler map, and appends an ``ActionLogEntry`` with
  6  ``rollback_of`` populated on success.
  7  
  8  The executor must never bypass the planner's allow-list — these tests
  9  lock that in by asserting which dispatches happen and which are
 10  refused.
 11  """
 12  
 13  from __future__ import annotations
 14  
 15  import json
 16  from collections.abc import Awaitable, Callable
 17  from pathlib import Path
 18  from typing import Any
 19  
 20  import pytest
 21  
 22  from mureo.context.models import ActionLogEntry, StateDocument
 23  from mureo.context.state import read_state_file, write_state_file
 24  
 25  # The module under test does not exist yet — this import is the RED
 26  # that drives implementation.
 27  from mureo.rollback.executor import (  # noqa: I001
 28      RollbackExecutionError,
 29      execute_rollback,
 30  )
 31  
 32  
 33  Dispatcher = Callable[[str, dict[str, Any]], Awaitable[list[Any]]]
 34  
 35  
 36  def _write_state(path: Path, entries: list[ActionLogEntry]) -> None:
 37      write_state_file(path, StateDocument(version="2", action_log=tuple(entries)))
 38  
 39  
 40  def _budget_update_entry(
 41      *,
 42      timestamp: str = "2026-04-15T10:00:00",
 43      campaign_id: str = "100",
 44      budget_id: str = "B1",
 45      amount_micros: int = 5_000_000_000,
 46  ) -> ActionLogEntry:
 47      return ActionLogEntry(
 48          timestamp=timestamp,
 49          action="google_ads.budgets.update",
 50          platform="google_ads",
 51          campaign_id=campaign_id,
 52          summary="Increased budget for traffic test",
 53          reversible_params={
 54              "operation": "google_ads.budgets.update",
 55              "params": {"budget_id": budget_id, "amount_micros": amount_micros},
 56          },
 57      )
 58  
 59  
 60  class _FakeDispatcher:
 61      """Records every dispatch call and returns a canned result."""
 62  
 63      def __init__(
 64          self,
 65          *,
 66          return_value: list[Any] | None = None,
 67          raise_exc: Exception | None = None,
 68      ) -> None:
 69          self.calls: list[tuple[str, dict[str, Any]]] = []
 70          self._return_value = (
 71              return_value if return_value is not None else [{"ok": True}]
 72          )
 73          self._raise_exc = raise_exc
 74  
 75      async def __call__(self, name: str, arguments: dict[str, Any]) -> list[Any]:
 76          self.calls.append((name, dict(arguments)))
 77          if self._raise_exc is not None:
 78              raise self._raise_exc
 79          return self._return_value
 80  
 81  
 82  @pytest.mark.unit
 83  class TestExecuteRollback:
 84      @pytest.mark.asyncio
 85      async def test_executes_supported_plan_and_appends_log(
 86          self, tmp_path: Path
 87      ) -> None:
 88          """A supported plan is dispatched with the planner's params, and a
 89          new ActionLogEntry tagged rollback_of=<index> is appended."""
 90          state_file = tmp_path / "STATE.json"
 91          entry = _budget_update_entry()
 92          _write_state(state_file, [entry])
 93          dispatcher = _FakeDispatcher()
 94  
 95          result = await execute_rollback(
 96              state_file=state_file,
 97              index=0,
 98              confirm=True,
 99              dispatcher=dispatcher,
100          )
101  
102          assert result["status"] == "applied"
103          assert result["dispatched_tool"] == "google_ads.budgets.update"
104          assert dispatcher.calls == [
105              (
106                  "google_ads.budgets.update",
107                  {"budget_id": "B1", "amount_micros": 5_000_000_000},
108              )
109          ]
110  
111          doc = read_state_file(state_file)
112          assert len(doc.action_log) == 2
113          new_entry = doc.action_log[1]
114          assert new_entry.action == "google_ads.budgets.update"
115          assert new_entry.platform == "google_ads"
116          assert new_entry.rollback_of == 0
117          # Rollback of a rollback must not be chained by default.
118          assert new_entry.reversible_params is None
119  
120      @pytest.mark.asyncio
121      async def test_confirm_false_refuses(self, tmp_path: Path) -> None:
122          state_file = tmp_path / "STATE.json"
123          _write_state(state_file, [_budget_update_entry()])
124          dispatcher = _FakeDispatcher()
125  
126          with pytest.raises(RollbackExecutionError, match="confirm"):
127              await execute_rollback(
128                  state_file=state_file,
129                  index=0,
130                  confirm=False,
131                  dispatcher=dispatcher,
132              )
133  
134          assert dispatcher.calls == []
135          doc = read_state_file(state_file)
136          assert len(doc.action_log) == 1  # unchanged
137  
138      @pytest.mark.asyncio
139      async def test_out_of_range_index(self, tmp_path: Path) -> None:
140          state_file = tmp_path / "STATE.json"
141          _write_state(state_file, [_budget_update_entry()])
142          dispatcher = _FakeDispatcher()
143  
144          with pytest.raises(RollbackExecutionError, match="out of range"):
145              await execute_rollback(
146                  state_file=state_file,
147                  index=5,
148                  confirm=True,
149                  dispatcher=dispatcher,
150              )
151          assert dispatcher.calls == []
152  
153      @pytest.mark.asyncio
154      async def test_read_only_entry_refused(self, tmp_path: Path) -> None:
155          state_file = tmp_path / "STATE.json"
156          _write_state(
157              state_file,
158              [
159                  ActionLogEntry(
160                      timestamp="t",
161                      action="list_campaigns",
162                      platform="google_ads",
163                  )
164              ],
165          )
166          dispatcher = _FakeDispatcher()
167  
168          with pytest.raises(RollbackExecutionError, match="nothing to roll back"):
169              await execute_rollback(
170                  state_file=state_file,
171                  index=0,
172                  confirm=True,
173                  dispatcher=dispatcher,
174              )
175          assert dispatcher.calls == []
176  
177      @pytest.mark.asyncio
178      async def test_not_supported_refused(self, tmp_path: Path) -> None:
179          state_file = tmp_path / "STATE.json"
180          entry = ActionLogEntry(
181              timestamp="t",
182              action="google_ads.campaigns.delete",
183              platform="google_ads",
184              reversible_params={
185                  "operation": "google_ads.campaigns.delete",
186                  "params": {"campaign_id": "100"},
187              },
188          )
189          _write_state(state_file, [entry])
190          dispatcher = _FakeDispatcher()
191  
192          with pytest.raises(RollbackExecutionError, match="not supported"):
193              await execute_rollback(
194                  state_file=state_file,
195                  index=0,
196                  confirm=True,
197                  dispatcher=dispatcher,
198              )
199          assert dispatcher.calls == []
200  
201      @pytest.mark.asyncio
202      async def test_dispatch_failure_does_not_append_log(
203          self, tmp_path: Path
204      ) -> None:
205          state_file = tmp_path / "STATE.json"
206          _write_state(state_file, [_budget_update_entry()])
207          dispatcher = _FakeDispatcher(raise_exc=RuntimeError("API exploded"))
208  
209          with pytest.raises(RuntimeError, match="API exploded"):
210              await execute_rollback(
211                  state_file=state_file,
212                  index=0,
213                  confirm=True,
214                  dispatcher=dispatcher,
215              )
216  
217          doc = read_state_file(state_file)
218          assert len(doc.action_log) == 1  # no rollback entry appended
219  
220      @pytest.mark.asyncio
221      async def test_double_rollback_refused(self, tmp_path: Path) -> None:
222          """Applying a rollback twice for the same index must be rejected."""
223          state_file = tmp_path / "STATE.json"
224          _write_state(state_file, [_budget_update_entry()])
225          dispatcher = _FakeDispatcher()
226  
227          # First rollback: succeeds.
228          await execute_rollback(
229              state_file=state_file,
230              index=0,
231              confirm=True,
232              dispatcher=dispatcher,
233          )
234  
235          # Second rollback of the same index: refused.
236          with pytest.raises(RollbackExecutionError, match="already rolled back"):
237              await execute_rollback(
238                  state_file=state_file,
239                  index=0,
240                  confirm=True,
241                  dispatcher=dispatcher,
242              )
243  
244          # Dispatcher called exactly once.
245          assert len(dispatcher.calls) == 1
246  
247      @pytest.mark.asyncio
248      async def test_missing_state_file_refused(self, tmp_path: Path) -> None:
249          state_file = tmp_path / "does_not_exist.json"
250          dispatcher = _FakeDispatcher()
251  
252          with pytest.raises(RollbackExecutionError, match="not found"):
253              await execute_rollback(
254                  state_file=state_file,
255                  index=0,
256                  confirm=True,
257                  dispatcher=dispatcher,
258              )
259          assert dispatcher.calls == []
260  
261      @pytest.mark.asyncio
262      async def test_partial_rollback_surfaces_caveats(self, tmp_path: Path) -> None:
263          """A PARTIAL plan still applies but caveats are returned to caller."""
264          state_file = tmp_path / "STATE.json"
265          entry = ActionLogEntry(
266              timestamp="t",
267              action="google_ads.budgets.update",
268              platform="google_ads",
269              reversible_params={
270                  "operation": "google_ads.budgets.update",
271                  "params": {"budget_id": "B1", "amount_micros": 1_000_000_000},
272                  "caveats": ["Spend already incurred cannot be refunded."],
273              },
274          )
275          _write_state(state_file, [entry])
276          dispatcher = _FakeDispatcher()
277  
278          result = await execute_rollback(
279              state_file=state_file,
280              index=0,
281              confirm=True,
282              dispatcher=dispatcher,
283          )
284  
285          assert result["status"] == "applied"
286          assert result["caveats"] == [
287              "Spend already incurred cannot be refunded."
288          ]
289  
290  
291  @pytest.mark.unit
292  class TestExecutorWiringContract:
293      """Contract tests that lock in how the executor integrates with STATE.json."""
294  
295      @pytest.mark.asyncio
296      async def test_new_log_entry_references_original_action(
297          self, tmp_path: Path
298      ) -> None:
299          state_file = tmp_path / "STATE.json"
300          original = _budget_update_entry(
301              timestamp="2026-04-15T10:00:00",
302              campaign_id="CID-42",
303          )
304          _write_state(state_file, [original])
305          dispatcher = _FakeDispatcher()
306  
307          await execute_rollback(
308              state_file=state_file,
309              index=0,
310              confirm=True,
311              dispatcher=dispatcher,
312          )
313  
314          doc = read_state_file(state_file)
315          new_entry = doc.action_log[1]
316          assert new_entry.campaign_id == "CID-42"
317          assert "Rolled back" in (new_entry.summary or "")
318          assert "0" in (new_entry.summary or "")
319  
320      @pytest.mark.asyncio
321      async def test_state_file_is_valid_json_after_success(
322          self, tmp_path: Path
323      ) -> None:
324          state_file = tmp_path / "STATE.json"
325          _write_state(state_file, [_budget_update_entry()])
326          dispatcher = _FakeDispatcher()
327  
328          await execute_rollback(
329              state_file=state_file,
330              index=0,
331              confirm=True,
332              dispatcher=dispatcher,
333          )
334  
335          # Atomically written and re-parseable.
336          data = json.loads(state_file.read_text(encoding="utf-8"))
337          assert data["action_log"][1]["rollback_of"] == 0