test_webhooks.py
1 """Project event webhook tests. 2 3 Covers the helper (`emit_event`) end-to-end with mocked HTTP, plus the 4 admin Test endpoint. 5 6 The HTTP POST happens in a daemon thread (so a flaky receiver can't 7 stall a cron / inference). Tests collect the request via a list shared 8 with the patched `requests.post`, then sleep briefly to let the thread 9 flush. Any test that depends on the request firing has a 10 ``thread.join``-ish guard via a ``threading.Event``. 11 """ 12 from __future__ import annotations 13 14 import hashlib 15 import hmac 16 import json 17 import threading 18 import time 19 from types import SimpleNamespace 20 from unittest.mock import MagicMock, patch 21 22 import pytest 23 24 from restai.utils.crypto import encrypt_field 25 26 27 # ─── helper ───────────────────────────────────────────────────────────── 28 29 def _wait_for(event: threading.Event, timeout: float = 2.0): 30 assert event.wait(timeout), "background webhook thread did not run within timeout" 31 32 33 def test_emit_event_skips_when_no_url(): 34 from restai.webhooks import emit_event 35 out = emit_event(1, "p", {}, "test", {"hi": True}) 36 assert out is False 37 38 39 def test_emit_event_skips_unknown_event_type(): 40 from restai.webhooks import emit_event 41 opts = {"webhook_url": "https://hooks.example.com/x"} 42 out = emit_event(1, "p", opts, "made_up_event", {}) 43 assert out is False 44 45 46 def test_emit_event_filters_by_subscription(): 47 from restai.webhooks import emit_event 48 opts = { 49 "webhook_url": "https://hooks.example.com/x", 50 "webhook_events": "sync_completed", 51 } 52 fired = emit_event(1, "p", opts, "budget_exceeded", {}) 53 assert fired is False 54 55 56 def test_emit_event_refuses_private_url(): 57 from restai.webhooks import emit_event 58 opts = {"webhook_url": "http://127.0.0.1:9999/hook"} 59 out = emit_event(1, "p", opts, "test", {}) 60 assert out is False 61 62 63 def test_emit_event_refuses_non_http_scheme(): 64 from restai.webhooks import emit_event 65 opts = {"webhook_url": "file:///etc/passwd"} 66 out = emit_event(1, "p", opts, "test", {}) 67 assert out is False 68 69 70 def test_emit_event_signs_with_hmac(): 71 from restai.webhooks import emit_event 72 secret = "super-shared-secret-123" 73 opts = { 74 "webhook_url": "https://hooks.example.com/x", 75 "webhook_secret": encrypt_field(secret), 76 } 77 captured = {} 78 done = threading.Event() 79 80 class _FakeResp: 81 status_code = 200 82 text = "" 83 84 def fake_post(url, data=None, headers=None, timeout=None): 85 captured.update({"url": url, "data": data, "headers": dict(headers), "timeout": timeout}) 86 done.set() 87 return _FakeResp() 88 89 with patch("requests.post", fake_post), \ 90 patch("restai.webhooks._is_private_ip", lambda h: False): 91 out = emit_event(42, "myproj", opts, "test", {"foo": "bar"}) 92 assert out is True 93 _wait_for(done) 94 95 assert captured["url"] == "https://hooks.example.com/x" 96 assert captured["headers"]["X-RESTai-Event"] == "test" 97 sig_header = captured["headers"]["X-RESTai-Signature"] 98 assert sig_header.startswith("sha256=") 99 expected = hmac.new(secret.encode(), captured["data"], hashlib.sha256).hexdigest() 100 assert sig_header == f"sha256={expected}" 101 payload = json.loads(captured["data"]) 102 assert payload["event"] == "test" 103 assert payload["project_id"] == 42 104 assert payload["project_name"] == "myproj" 105 assert payload["data"] == {"foo": "bar"} 106 107 108 def test_emit_event_omits_signature_when_no_secret(): 109 from restai.webhooks import emit_event 110 opts = {"webhook_url": "https://hooks.example.com/x"} 111 captured = {} 112 done = threading.Event() 113 114 class _FakeResp: 115 status_code = 200 116 text = "" 117 118 def fake_post(url, data=None, headers=None, timeout=None): 119 captured.update({"headers": dict(headers)}) 120 done.set() 121 return _FakeResp() 122 123 with patch("requests.post", fake_post), \ 124 patch("restai.webhooks._is_private_ip", lambda h: False): 125 emit_event(1, "p", opts, "test", {}) 126 _wait_for(done) 127 128 assert "X-RESTai-Signature" not in captured["headers"] 129 130 131 def test_emit_event_swallows_post_failure(): 132 """A flaky receiver must not raise into the caller — we're called 133 from inference / cron paths that can't be allowed to crash.""" 134 from restai.webhooks import emit_event 135 opts = {"webhook_url": "https://hooks.example.com/x"} 136 done = threading.Event() 137 138 def fake_post(*a, **kw): 139 done.set() 140 raise RuntimeError("network is on fire") 141 142 with patch("requests.post", fake_post), \ 143 patch("restai.webhooks._is_private_ip", lambda h: False): 144 # Must return True (request was queued) even though the POST will fail. 145 out = emit_event(1, "p", opts, "test", {}) 146 assert out is True 147 _wait_for(done) 148 # And no exception bubbles up — give the thread a moment to log. 149 time.sleep(0.05) 150 151 152 # ─── /projects/{id}/webhooks/test endpoint ───────────────────────────── 153 154 @pytest.fixture(scope="module") 155 def client(): 156 from fastapi.testclient import TestClient 157 from restai.main import app 158 with TestClient(app) as c: 159 yield c 160 161 162 @pytest.fixture(scope="module") 163 def project_id(client): 164 """Reuse the same fixture pattern as test_whatsapp_webhook.""" 165 from restai.config import RESTAI_DEFAULT_PASSWORD 166 auth = ("admin", RESTAI_DEFAULT_PASSWORD) 167 teams = client.get("/teams", auth=auth).json().get("teams", []) or [] 168 if not teams: 169 pytest.skip("no team available") 170 listing = client.get("/projects", auth=auth).json().get("projects", []) or [] 171 name = "webhook_test_project" 172 existing = next((p for p in listing if p.get("name") == name), None) 173 if existing: 174 return existing["id"] 175 info = client.get("/info", auth=auth).json() 176 llms = info.get("llms") or [] 177 if not llms: 178 pytest.skip("no LLMs configured") 179 resp = client.post( 180 "/projects", 181 json={"name": name, "type": "agent", "llm": llms[0]["name"], "team_id": teams[0]["id"]}, 182 auth=auth, 183 ) 184 if resp.status_code not in (200, 201): 185 pytest.skip(f"could not create project: {resp.status_code}") 186 return resp.json()["id"] 187 188 189 def test_webhook_test_endpoint_no_url_configured(client, project_id): 190 """The fixture creates a fresh project on first run, but it persists 191 across runs in the dev DB — a prior run of _fires_when_configured 192 may have stamped webhook_url into the options. Clear it explicitly 193 so this test asserts the precondition it actually depends on.""" 194 from restai.config import RESTAI_DEFAULT_PASSWORD 195 from restai.database import get_db_wrapper 196 from restai.models.databasemodels import ProjectDatabase 197 198 db = get_db_wrapper() 199 try: 200 proj = db.db.query(ProjectDatabase).filter(ProjectDatabase.id == project_id).first() 201 opts = json.loads(proj.options or "{}") 202 opts.pop("webhook_url", None) 203 opts.pop("webhook_secret", None) 204 proj.options = json.dumps(opts) 205 db.db.commit() 206 finally: 207 db.db.close() 208 209 r = client.post( 210 f"/projects/{project_id}/webhooks/test", auth=("admin", RESTAI_DEFAULT_PASSWORD), 211 ) 212 assert r.status_code == 200 213 body = r.json() 214 assert body["ok"] is False 215 assert "no webhook_url" in body["reason"] 216 217 218 def test_webhook_test_endpoint_fires_when_configured(client, project_id): 219 """Writing a webhook_url + flipping the test endpoint should queue 220 a POST. We patch requests.post so no real network call happens.""" 221 from restai.config import RESTAI_DEFAULT_PASSWORD 222 from restai.database import get_db_wrapper 223 from restai.models.databasemodels import ProjectDatabase 224 225 auth = ("admin", RESTAI_DEFAULT_PASSWORD) 226 227 # Configure a webhook URL directly in the DB (bypasses model validation). 228 db = get_db_wrapper() 229 try: 230 proj = db.db.query(ProjectDatabase).filter(ProjectDatabase.id == project_id).first() 231 opts = json.loads(proj.options or "{}") 232 opts["webhook_url"] = "https://hooks.example.com/wh-test" 233 opts["webhook_secret"] = encrypt_field("s3cret") 234 proj.options = json.dumps(opts) 235 db.db.commit() 236 finally: 237 db.db.close() 238 239 captured = {} 240 done = threading.Event() 241 class _FakeResp: 242 status_code = 202 243 text = "" 244 def fake_post(url, data=None, headers=None, timeout=None): 245 captured.update({"url": url, "headers": dict(headers)}) 246 done.set() 247 return _FakeResp() 248 249 with patch("requests.post", fake_post), \ 250 patch("restai.webhooks._is_private_ip", lambda h: False): 251 r = client.post(f"/projects/{project_id}/webhooks/test", auth=auth) 252 assert r.status_code == 200 253 assert r.json() == {"ok": True} 254 _wait_for(done) 255 256 assert captured["url"] == "https://hooks.example.com/wh-test" 257 assert captured["headers"]["X-RESTai-Event"] == "test" 258 assert captured["headers"]["X-RESTai-Signature"].startswith("sha256=")