webhooks.py
1 """Outbound event webhooks for projects. 2 3 Per-project: when an admin sets ``webhook_url`` on a project, RESTai POSTs 4 JSON event payloads to that URL whenever interesting things happen 5 (budget exhausted, sync finished, eval finished, routine failed). Lets 6 SMBs wire RESTai into Zapier / n8n / custom CRMs without scraping the 7 audit log. 8 9 * **One shared signature scheme** — every payload is signed with 10 HMAC-SHA256 of the raw body keyed on the project's ``webhook_secret``; 11 the digest goes in the ``X-RESTai-Signature`` header as 12 ``sha256=<hexdigest>``. Same shape Meta uses for WhatsApp, GitHub uses 13 for repos — keeps receiver code reusable. 14 * **Fire-and-forget** — POSTs run in a background thread with a 10s 15 timeout. We log non-2xx responses but never raise into the caller, so 16 a flaky receiver can't break inference / cron / eval flows. 17 * **Subscription filter** — projects can opt into a subset of events via 18 the ``webhook_events`` CSV. Empty/missing = subscribe to everything. 19 * **SSRF guard** — webhooks must point at a public host. Loopback / 20 RFC1918 / link-local destinations are refused (matches the SSRF 21 guards on `crawler_classic` and `_sync_url`). An admin who needs to 22 test against localhost has to do it through a tunnel anyway since 23 worker processes don't share localhost. 24 """ 25 from __future__ import annotations 26 27 import hashlib 28 import hmac 29 import json 30 import logging 31 import threading 32 from datetime import datetime, timezone 33 from typing import Any, Optional 34 from urllib.parse import urlparse 35 36 import requests 37 38 from restai.helper import _is_private_ip 39 from restai.utils.crypto import decrypt_field 40 41 logger = logging.getLogger(__name__) 42 43 SUPPORTED_EVENTS = { 44 "budget_exceeded", 45 "sync_completed", 46 "eval_completed", 47 "routine_failed", 48 "test", # synthetic event the admin Test button fires. 49 } 50 51 52 def _project_webhook_config(opts: dict) -> tuple[str, str, set[str]]: 53 """Pull (url, secret, allowed_events) out of a project options blob. 54 Returns ('', '', set()) when the project hasn't configured webhooks 55 so callers can early-exit cheaply.""" 56 url = (opts.get("webhook_url") or "").strip() 57 if not url: 58 return "", "", set() 59 secret = decrypt_field(opts.get("webhook_secret") or "") 60 raw_events = (opts.get("webhook_events") or "").strip() 61 if raw_events: 62 events = {e.strip() for e in raw_events.replace(";", ",").split(",") if e.strip()} 63 else: 64 # Empty = subscribe to everything supported. 65 events = set(SUPPORTED_EVENTS) 66 return url, secret, events 67 68 69 def _safe_url(url: str) -> Optional[str]: 70 """Return the URL only if it's safe to fetch (https? scheme + public 71 hostname). Returns None and logs a warning otherwise.""" 72 try: 73 parsed = urlparse(url) 74 except Exception: 75 return None 76 if parsed.scheme not in ("http", "https"): 77 logger.warning("webhook url has unsupported scheme: %r", url) 78 return None 79 hostname = parsed.hostname 80 if not hostname: 81 logger.warning("webhook url has no hostname: %r", url) 82 return None 83 try: 84 if _is_private_ip(hostname): 85 logger.warning("refusing to POST webhook to private/internal address: %s", hostname) 86 return None 87 except ValueError as e: 88 logger.warning("webhook url unresolvable: %s", e) 89 return None 90 return url 91 92 93 def _post_in_thread(url: str, body: bytes, headers: dict) -> None: 94 def _go(): 95 try: 96 resp = requests.post(url, data=body, headers=headers, timeout=10) 97 if resp.status_code >= 400: 98 logger.warning("webhook POST returned HTTP %s: %s", resp.status_code, resp.text[:200]) 99 except Exception as e: 100 logger.warning("webhook POST failed: %s", e) 101 threading.Thread(target=_go, daemon=True).start() 102 103 104 def emit_event(project_id: int, project_name: str, opts: dict, 105 event_type: str, data: Any) -> bool: 106 """Send an event webhook for one project. 107 108 Args: 109 project_id: numeric project id 110 project_name: project name (included in payload for receiver UX) 111 opts: the project's options dict (already JSON-decoded) 112 event_type: one of SUPPORTED_EVENTS 113 data: event-specific payload (must be JSON-serializable) 114 115 Returns ``True`` when a request was queued, ``False`` when it was 116 skipped (no url configured, event filtered out, or unsafe url). 117 """ 118 if event_type not in SUPPORTED_EVENTS: 119 logger.warning("emit_event: unknown event_type %r — refusing to send", event_type) 120 return False 121 122 url, secret, events = _project_webhook_config(opts) 123 if not url: 124 return False 125 if event_type not in events: 126 return False 127 safe = _safe_url(url) 128 if not safe: 129 return False 130 131 payload = { 132 "event": event_type, 133 "project_id": project_id, 134 "project_name": project_name, 135 "timestamp": datetime.now(timezone.utc).isoformat(), 136 "data": data or {}, 137 } 138 body = json.dumps(payload, separators=(",", ":"), default=str).encode("utf-8") 139 headers = { 140 "Content-Type": "application/json", 141 "User-Agent": "RESTai-Webhook/1.0", 142 "X-RESTai-Event": event_type, 143 } 144 if secret: 145 sig = hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest() 146 headers["X-RESTai-Signature"] = f"sha256={sig}" 147 148 _post_in_thread(safe, body, headers) 149 return True 150 151 152 def emit_event_for_project_id(project_id: int, event_type: str, data: Any) -> bool: 153 """Convenience wrapper that fetches the project options from the DB 154 itself. Use this from places (cron jobs, BackgroundTasks) that 155 don't already hold a project handle. Returns True/False like 156 ``emit_event``.""" 157 try: 158 from restai.database import get_db_wrapper 159 except Exception: 160 return False 161 db = get_db_wrapper() 162 try: 163 proj = db.get_project_by_id(int(project_id)) 164 if proj is None: 165 return False 166 try: 167 opts = json.loads(proj.options) if proj.options else {} 168 except Exception: 169 opts = {} 170 return emit_event(proj.id, proj.name, opts, event_type, data) 171 finally: 172 db.db.close()