/ restai / webhooks.py
webhooks.py
  1  """Outbound event webhooks for projects.
  2  
  3  Per-project: when an admin sets ``webhook_url`` on a project, RESTai POSTs
  4  JSON event payloads to that URL whenever interesting things happen
  5  (budget exhausted, sync finished, eval finished, routine failed). Lets
  6  SMBs wire RESTai into Zapier / n8n / custom CRMs without scraping the
  7  audit log.
  8  
  9  * **One shared signature scheme** — every payload is signed with
 10    HMAC-SHA256 of the raw body keyed on the project's ``webhook_secret``;
 11    the digest goes in the ``X-RESTai-Signature`` header as
 12    ``sha256=<hexdigest>``. Same shape Meta uses for WhatsApp, GitHub uses
 13    for repos — keeps receiver code reusable.
 14  * **Fire-and-forget** — POSTs run in a background thread with a 10s
 15    timeout. We log non-2xx responses but never raise into the caller, so
 16    a flaky receiver can't break inference / cron / eval flows.
 17  * **Subscription filter** — projects can opt into a subset of events via
 18    the ``webhook_events`` CSV. Empty/missing = subscribe to everything.
 19  * **SSRF guard** — webhooks must point at a public host. Loopback /
 20    RFC1918 / link-local destinations are refused (matches the SSRF
 21    guards on `crawler_classic` and `_sync_url`). An admin who needs to
 22    test against localhost has to do it through a tunnel anyway since
 23    worker processes don't share localhost.
 24  """
 25  from __future__ import annotations
 26  
 27  import hashlib
 28  import hmac
 29  import json
 30  import logging
 31  import threading
 32  from datetime import datetime, timezone
 33  from typing import Any, Optional
 34  from urllib.parse import urlparse
 35  
 36  import requests
 37  
 38  from restai.helper import _is_private_ip
 39  from restai.utils.crypto import decrypt_field
 40  
 41  logger = logging.getLogger(__name__)
 42  
 43  SUPPORTED_EVENTS = {
 44      "budget_exceeded",
 45      "sync_completed",
 46      "eval_completed",
 47      "routine_failed",
 48      "test",  # synthetic event the admin Test button fires.
 49  }
 50  
 51  
 52  def _project_webhook_config(opts: dict) -> tuple[str, str, set[str]]:
 53      """Pull (url, secret, allowed_events) out of a project options blob.
 54      Returns ('', '', set()) when the project hasn't configured webhooks
 55      so callers can early-exit cheaply."""
 56      url = (opts.get("webhook_url") or "").strip()
 57      if not url:
 58          return "", "", set()
 59      secret = decrypt_field(opts.get("webhook_secret") or "")
 60      raw_events = (opts.get("webhook_events") or "").strip()
 61      if raw_events:
 62          events = {e.strip() for e in raw_events.replace(";", ",").split(",") if e.strip()}
 63      else:
 64          # Empty = subscribe to everything supported.
 65          events = set(SUPPORTED_EVENTS)
 66      return url, secret, events
 67  
 68  
 69  def _safe_url(url: str) -> Optional[str]:
 70      """Return the URL only if it's safe to fetch (https? scheme + public
 71      hostname). Returns None and logs a warning otherwise."""
 72      try:
 73          parsed = urlparse(url)
 74      except Exception:
 75          return None
 76      if parsed.scheme not in ("http", "https"):
 77          logger.warning("webhook url has unsupported scheme: %r", url)
 78          return None
 79      hostname = parsed.hostname
 80      if not hostname:
 81          logger.warning("webhook url has no hostname: %r", url)
 82          return None
 83      try:
 84          if _is_private_ip(hostname):
 85              logger.warning("refusing to POST webhook to private/internal address: %s", hostname)
 86              return None
 87      except ValueError as e:
 88          logger.warning("webhook url unresolvable: %s", e)
 89          return None
 90      return url
 91  
 92  
 93  def _post_in_thread(url: str, body: bytes, headers: dict) -> None:
 94      def _go():
 95          try:
 96              resp = requests.post(url, data=body, headers=headers, timeout=10)
 97              if resp.status_code >= 400:
 98                  logger.warning("webhook POST returned HTTP %s: %s", resp.status_code, resp.text[:200])
 99          except Exception as e:
100              logger.warning("webhook POST failed: %s", e)
101      threading.Thread(target=_go, daemon=True).start()
102  
103  
104  def emit_event(project_id: int, project_name: str, opts: dict,
105                 event_type: str, data: Any) -> bool:
106      """Send an event webhook for one project.
107  
108      Args:
109          project_id: numeric project id
110          project_name: project name (included in payload for receiver UX)
111          opts: the project's options dict (already JSON-decoded)
112          event_type: one of SUPPORTED_EVENTS
113          data: event-specific payload (must be JSON-serializable)
114  
115      Returns ``True`` when a request was queued, ``False`` when it was
116      skipped (no url configured, event filtered out, or unsafe url).
117      """
118      if event_type not in SUPPORTED_EVENTS:
119          logger.warning("emit_event: unknown event_type %r — refusing to send", event_type)
120          return False
121  
122      url, secret, events = _project_webhook_config(opts)
123      if not url:
124          return False
125      if event_type not in events:
126          return False
127      safe = _safe_url(url)
128      if not safe:
129          return False
130  
131      payload = {
132          "event": event_type,
133          "project_id": project_id,
134          "project_name": project_name,
135          "timestamp": datetime.now(timezone.utc).isoformat(),
136          "data": data or {},
137      }
138      body = json.dumps(payload, separators=(",", ":"), default=str).encode("utf-8")
139      headers = {
140          "Content-Type": "application/json",
141          "User-Agent": "RESTai-Webhook/1.0",
142          "X-RESTai-Event": event_type,
143      }
144      if secret:
145          sig = hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
146          headers["X-RESTai-Signature"] = f"sha256={sig}"
147  
148      _post_in_thread(safe, body, headers)
149      return True
150  
151  
152  def emit_event_for_project_id(project_id: int, event_type: str, data: Any) -> bool:
153      """Convenience wrapper that fetches the project options from the DB
154      itself. Use this from places (cron jobs, BackgroundTasks) that
155      don't already hold a project handle. Returns True/False like
156      ``emit_event``."""
157      try:
158          from restai.database import get_db_wrapper
159      except Exception:
160          return False
161      db = get_db_wrapper()
162      try:
163          proj = db.get_project_by_id(int(project_id))
164          if proj is None:
165              return False
166          try:
167              opts = json.loads(proj.options) if proj.options else {}
168          except Exception:
169              opts = {}
170          return emit_event(proj.id, proj.name, opts, event_type, data)
171      finally:
172          db.db.close()