/ tests / test_webhooks.py
test_webhooks.py
  1  """Project event webhook tests.
  2  
  3  Covers the helper (`emit_event`) end-to-end with mocked HTTP, plus the
  4  admin Test endpoint.
  5  
  6  The HTTP POST happens in a daemon thread (so a flaky receiver can't
  7  stall a cron / inference). Tests collect the request via a list shared
  8  with the patched `requests.post`, then sleep briefly to let the thread
  9  flush. Any test that depends on the request firing has a
 10  ``thread.join``-ish guard via a ``threading.Event``.
 11  """
 12  from __future__ import annotations
 13  
 14  import hashlib
 15  import hmac
 16  import json
 17  import threading
 18  import time
 19  from types import SimpleNamespace
 20  from unittest.mock import MagicMock, patch
 21  
 22  import pytest
 23  
 24  from restai.utils.crypto import encrypt_field
 25  
 26  
 27  # ─── helper ─────────────────────────────────────────────────────────────
 28  
 29  def _wait_for(event: threading.Event, timeout: float = 2.0):
 30      assert event.wait(timeout), "background webhook thread did not run within timeout"
 31  
 32  
 33  def test_emit_event_skips_when_no_url():
 34      from restai.webhooks import emit_event
 35      out = emit_event(1, "p", {}, "test", {"hi": True})
 36      assert out is False
 37  
 38  
 39  def test_emit_event_skips_unknown_event_type():
 40      from restai.webhooks import emit_event
 41      opts = {"webhook_url": "https://hooks.example.com/x"}
 42      out = emit_event(1, "p", opts, "made_up_event", {})
 43      assert out is False
 44  
 45  
 46  def test_emit_event_filters_by_subscription():
 47      from restai.webhooks import emit_event
 48      opts = {
 49          "webhook_url": "https://hooks.example.com/x",
 50          "webhook_events": "sync_completed",
 51      }
 52      fired = emit_event(1, "p", opts, "budget_exceeded", {})
 53      assert fired is False
 54  
 55  
 56  def test_emit_event_refuses_private_url():
 57      from restai.webhooks import emit_event
 58      opts = {"webhook_url": "http://127.0.0.1:9999/hook"}
 59      out = emit_event(1, "p", opts, "test", {})
 60      assert out is False
 61  
 62  
 63  def test_emit_event_refuses_non_http_scheme():
 64      from restai.webhooks import emit_event
 65      opts = {"webhook_url": "file:///etc/passwd"}
 66      out = emit_event(1, "p", opts, "test", {})
 67      assert out is False
 68  
 69  
 70  def test_emit_event_signs_with_hmac():
 71      from restai.webhooks import emit_event
 72      secret = "super-shared-secret-123"
 73      opts = {
 74          "webhook_url": "https://hooks.example.com/x",
 75          "webhook_secret": encrypt_field(secret),
 76      }
 77      captured = {}
 78      done = threading.Event()
 79  
 80      class _FakeResp:
 81          status_code = 200
 82          text = ""
 83  
 84      def fake_post(url, data=None, headers=None, timeout=None):
 85          captured.update({"url": url, "data": data, "headers": dict(headers), "timeout": timeout})
 86          done.set()
 87          return _FakeResp()
 88  
 89      with patch("requests.post", fake_post), \
 90           patch("restai.webhooks._is_private_ip", lambda h: False):
 91          out = emit_event(42, "myproj", opts, "test", {"foo": "bar"})
 92          assert out is True
 93          _wait_for(done)
 94  
 95      assert captured["url"] == "https://hooks.example.com/x"
 96      assert captured["headers"]["X-RESTai-Event"] == "test"
 97      sig_header = captured["headers"]["X-RESTai-Signature"]
 98      assert sig_header.startswith("sha256=")
 99      expected = hmac.new(secret.encode(), captured["data"], hashlib.sha256).hexdigest()
100      assert sig_header == f"sha256={expected}"
101      payload = json.loads(captured["data"])
102      assert payload["event"] == "test"
103      assert payload["project_id"] == 42
104      assert payload["project_name"] == "myproj"
105      assert payload["data"] == {"foo": "bar"}
106  
107  
108  def test_emit_event_omits_signature_when_no_secret():
109      from restai.webhooks import emit_event
110      opts = {"webhook_url": "https://hooks.example.com/x"}
111      captured = {}
112      done = threading.Event()
113  
114      class _FakeResp:
115          status_code = 200
116          text = ""
117  
118      def fake_post(url, data=None, headers=None, timeout=None):
119          captured.update({"headers": dict(headers)})
120          done.set()
121          return _FakeResp()
122  
123      with patch("requests.post", fake_post), \
124           patch("restai.webhooks._is_private_ip", lambda h: False):
125          emit_event(1, "p", opts, "test", {})
126          _wait_for(done)
127  
128      assert "X-RESTai-Signature" not in captured["headers"]
129  
130  
131  def test_emit_event_swallows_post_failure():
132      """A flaky receiver must not raise into the caller — we're called
133      from inference / cron paths that can't be allowed to crash."""
134      from restai.webhooks import emit_event
135      opts = {"webhook_url": "https://hooks.example.com/x"}
136      done = threading.Event()
137  
138      def fake_post(*a, **kw):
139          done.set()
140          raise RuntimeError("network is on fire")
141  
142      with patch("requests.post", fake_post), \
143           patch("restai.webhooks._is_private_ip", lambda h: False):
144          # Must return True (request was queued) even though the POST will fail.
145          out = emit_event(1, "p", opts, "test", {})
146          assert out is True
147          _wait_for(done)
148          # And no exception bubbles up — give the thread a moment to log.
149          time.sleep(0.05)
150  
151  
152  # ─── /projects/{id}/webhooks/test endpoint ─────────────────────────────
153  
154  @pytest.fixture(scope="module")
155  def client():
156      from fastapi.testclient import TestClient
157      from restai.main import app
158      with TestClient(app) as c:
159          yield c
160  
161  
162  @pytest.fixture(scope="module")
163  def project_id(client):
164      """Reuse the same fixture pattern as test_whatsapp_webhook."""
165      from restai.config import RESTAI_DEFAULT_PASSWORD
166      auth = ("admin", RESTAI_DEFAULT_PASSWORD)
167      teams = client.get("/teams", auth=auth).json().get("teams", []) or []
168      if not teams:
169          pytest.skip("no team available")
170      listing = client.get("/projects", auth=auth).json().get("projects", []) or []
171      name = "webhook_test_project"
172      existing = next((p for p in listing if p.get("name") == name), None)
173      if existing:
174          return existing["id"]
175      info = client.get("/info", auth=auth).json()
176      llms = info.get("llms") or []
177      if not llms:
178          pytest.skip("no LLMs configured")
179      resp = client.post(
180          "/projects",
181          json={"name": name, "type": "agent", "llm": llms[0]["name"], "team_id": teams[0]["id"]},
182          auth=auth,
183      )
184      if resp.status_code not in (200, 201):
185          pytest.skip(f"could not create project: {resp.status_code}")
186      return resp.json()["id"]
187  
188  
189  def test_webhook_test_endpoint_no_url_configured(client, project_id):
190      """The fixture creates a fresh project on first run, but it persists
191      across runs in the dev DB — a prior run of _fires_when_configured
192      may have stamped webhook_url into the options. Clear it explicitly
193      so this test asserts the precondition it actually depends on."""
194      from restai.config import RESTAI_DEFAULT_PASSWORD
195      from restai.database import get_db_wrapper
196      from restai.models.databasemodels import ProjectDatabase
197  
198      db = get_db_wrapper()
199      try:
200          proj = db.db.query(ProjectDatabase).filter(ProjectDatabase.id == project_id).first()
201          opts = json.loads(proj.options or "{}")
202          opts.pop("webhook_url", None)
203          opts.pop("webhook_secret", None)
204          proj.options = json.dumps(opts)
205          db.db.commit()
206      finally:
207          db.db.close()
208  
209      r = client.post(
210          f"/projects/{project_id}/webhooks/test", auth=("admin", RESTAI_DEFAULT_PASSWORD),
211      )
212      assert r.status_code == 200
213      body = r.json()
214      assert body["ok"] is False
215      assert "no webhook_url" in body["reason"]
216  
217  
218  def test_webhook_test_endpoint_fires_when_configured(client, project_id):
219      """Writing a webhook_url + flipping the test endpoint should queue
220      a POST. We patch requests.post so no real network call happens."""
221      from restai.config import RESTAI_DEFAULT_PASSWORD
222      from restai.database import get_db_wrapper
223      from restai.models.databasemodels import ProjectDatabase
224  
225      auth = ("admin", RESTAI_DEFAULT_PASSWORD)
226  
227      # Configure a webhook URL directly in the DB (bypasses model validation).
228      db = get_db_wrapper()
229      try:
230          proj = db.db.query(ProjectDatabase).filter(ProjectDatabase.id == project_id).first()
231          opts = json.loads(proj.options or "{}")
232          opts["webhook_url"] = "https://hooks.example.com/wh-test"
233          opts["webhook_secret"] = encrypt_field("s3cret")
234          proj.options = json.dumps(opts)
235          db.db.commit()
236      finally:
237          db.db.close()
238  
239      captured = {}
240      done = threading.Event()
241      class _FakeResp:
242          status_code = 202
243          text = ""
244      def fake_post(url, data=None, headers=None, timeout=None):
245          captured.update({"url": url, "headers": dict(headers)})
246          done.set()
247          return _FakeResp()
248  
249      with patch("requests.post", fake_post), \
250           patch("restai.webhooks._is_private_ip", lambda h: False):
251          r = client.post(f"/projects/{project_id}/webhooks/test", auth=auth)
252          assert r.status_code == 200
253          assert r.json() == {"ok": True}
254          _wait_for(done)
255  
256      assert captured["url"] == "https://hooks.example.com/wh-test"
257      assert captured["headers"]["X-RESTai-Event"] == "test"
258      assert captured["headers"]["X-RESTai-Signature"].startswith("sha256=")