whatsapp_webhook.py
1 """WhatsApp Business Cloud API webhook. 2 3 Single shared webhook URL routes inbound messages to projects by 4 ``entry[0].changes[0].value.metadata.phone_number_id``. Each project's 5 HMAC signature is verified against its own ``whatsapp_app_secret``, so 6 multitenancy is kept honest by per-project secrets rather than by URL 7 namespacing. 8 9 Two endpoints under ``/webhooks/whatsapp``: 10 11 * ``GET`` — Meta's subscription handshake. Echoes ``hub.challenge`` when 12 ``hub.verify_token`` matches *any* project's stored verify token. 13 * ``POST`` — inbound message delivery. Verifies the signature, looks up 14 the project by phone-number id, runs the agent with 15 ``chat_id=f"whatsapp_{from_phone}"``, and posts the reply back via 16 Meta's Graph API. Always returns ``200`` within the request — heavy 17 work goes to ``BackgroundTasks`` because Meta retries aggressively on 18 any non-2xx (or any response taking more than ~10s). 19 """ 20 from __future__ import annotations 21 22 import asyncio 23 import json 24 import logging 25 from typing import Optional 26 27 from fastapi import APIRouter, BackgroundTasks, Depends, Request, Response, HTTPException, Query 28 29 from restai.auth import get_current_username_project 30 from restai.database import DBWrapper, get_db_wrapper 31 from restai.models.databasemodels import ProjectDatabase 32 from restai.models.models import ChatModel, User 33 from restai.utils.crypto import decrypt_field 34 from restai.whatsapp import send_message, verify_signature, validate_token 35 36 logger = logging.getLogger(__name__) 37 38 router = APIRouter() 39 40 41 def _project_options(proj: ProjectDatabase) -> dict: 42 try: 43 return json.loads(proj.options) if proj.options else {} 44 except Exception: 45 return {} 46 47 48 def _find_project_by_verify_token(db, verify_token: str) -> Optional[ProjectDatabase]: 49 """Search every project for a matching whatsapp_verify_token. Slow 50 O(N) but only runs during the one-time Meta subscription handshake.""" 51 for proj in db.db.query(ProjectDatabase).all(): 52 opts = _project_options(proj) 53 token = decrypt_field(opts.get("whatsapp_verify_token") or "") 54 if token and token == verify_token: 55 return proj 56 return None 57 58 59 def _find_project_by_phone_id(db, phone_number_id: str) -> Optional[ProjectDatabase]: 60 """Look up the project whose whatsapp_phone_number_id matches. The 61 field is stored in plaintext (it's not a secret) so we can do a 62 direct LIKE filter, then confirm in Python.""" 63 for proj in db.db.query(ProjectDatabase).all(): 64 opts = _project_options(proj) 65 if (opts.get("whatsapp_phone_number_id") or "") == phone_number_id: 66 return proj 67 return None 68 69 70 def _parse_allowlist(raw: str) -> set[str]: 71 """Same shape as Telegram's allowlist parser, but kept as strings — 72 WhatsApp ids are E.164 phone numbers, not ints.""" 73 out: set[str] = set() 74 if not raw: 75 return out 76 for piece in raw.replace(";", ",").split(","): 77 piece = piece.strip().lstrip("+") 78 if piece: 79 out.add(piece) 80 return out 81 82 83 @router.get("/webhooks/whatsapp") 84 async def verify_webhook( 85 request: Request, 86 hub_mode: Optional[str] = Query(None, alias="hub.mode"), 87 hub_challenge: Optional[str] = Query(None, alias="hub.challenge"), 88 hub_verify_token: Optional[str] = Query(None, alias="hub.verify_token"), 89 ): 90 """Meta's webhook subscription handshake. We echo back ``hub.challenge`` 91 only when the supplied ``hub.verify_token`` matches one of the 92 project-configured verify tokens. Returns 403 otherwise so Meta 93 surfaces a clear failure to the admin in Business Suite.""" 94 if hub_mode != "subscribe" or not hub_challenge or not hub_verify_token: 95 raise HTTPException(status_code=400, detail="missing required hub.* params") 96 97 db = get_db_wrapper() 98 try: 99 proj = _find_project_by_verify_token(db, hub_verify_token) 100 if proj is None: 101 logger.warning("WhatsApp webhook verify failed — no project matched verify_token") 102 raise HTTPException(status_code=403, detail="verify_token mismatch") 103 logger.info(f"WhatsApp webhook verified for project '{proj.name}' (id={proj.id})") 104 return Response(content=hub_challenge, media_type="text/plain") 105 finally: 106 db.db.close() 107 108 109 @router.post("/webhooks/whatsapp") 110 async def receive_webhook(request: Request, background_tasks: BackgroundTasks): 111 """Receive inbound WhatsApp messages. Always returns 200 unless the 112 signature fails — Meta retries on non-2xx, which would cause 113 duplicate messages.""" 114 raw = await request.body() 115 sig = request.headers.get("X-Hub-Signature-256") or request.headers.get("x-hub-signature-256") 116 117 try: 118 payload = json.loads(raw or b"{}") 119 except Exception: 120 logger.warning("WhatsApp webhook received non-JSON body") 121 return {"status": "ignored", "reason": "invalid json"} 122 123 if payload.get("object") != "whatsapp_business_account": 124 return {"status": "ignored", "reason": "unexpected object"} 125 126 db = get_db_wrapper() 127 try: 128 for entry in payload.get("entry", []) or []: 129 for change in entry.get("changes", []) or []: 130 value = change.get("value") or {} 131 metadata = value.get("metadata") or {} 132 phone_number_id = metadata.get("phone_number_id") 133 if not phone_number_id: 134 continue 135 136 proj = _find_project_by_phone_id(db, phone_number_id) 137 if proj is None: 138 logger.info(f"WhatsApp inbound for unknown phone_number_id={phone_number_id} — dropping") 139 continue 140 141 opts = _project_options(proj) 142 app_secret = decrypt_field(opts.get("whatsapp_app_secret") or "") 143 access_token = decrypt_field(opts.get("whatsapp_access_token") or "") 144 145 if not verify_signature(raw, sig, app_secret): 146 logger.warning( 147 f"WhatsApp signature verification FAILED for project '{proj.name}' " 148 f"(id={proj.id}) — possible attacker probe" 149 ) 150 raise HTTPException(status_code=401, detail="signature mismatch") 151 152 if not access_token: 153 logger.warning(f"WhatsApp project '{proj.name}' has no access_token — cannot reply") 154 continue 155 156 allowed = _parse_allowlist(opts.get("whatsapp_allowed_phone_numbers") or "") 157 158 for message in value.get("messages", []) or []: 159 background_tasks.add_task( 160 _handle_message_safe, 161 proj.id, phone_number_id, access_token, allowed, message, 162 ) 163 164 return {"status": "ok"} 165 finally: 166 db.db.close() 167 168 169 def _handle_message_safe(project_id: int, phone_number_id: str, access_token: str, 170 allowed: set[str], message: dict) -> None: 171 """Background task wrapper — never raises so a single bad message 172 can't poison the BackgroundTasks queue.""" 173 try: 174 _handle_message(project_id, phone_number_id, access_token, allowed, message) 175 except Exception: 176 logger.exception(f"WhatsApp message handler crashed (project={project_id})") 177 178 179 def _handle_message(project_id: int, phone_number_id: str, access_token: str, 180 allowed: set[str], message: dict) -> None: 181 msg_type = message.get("type") 182 from_phone = (message.get("from") or "").lstrip("+") 183 if not from_phone: 184 return 185 186 # Allowlist gate — applies to all message types so the polite reply 187 # is consistent. Empty allowlist = open access. 188 if allowed and from_phone not in allowed: 189 logger.info(f"WhatsApp sender {from_phone} not in allowlist for project {project_id}") 190 try: 191 send_message( 192 access_token, phone_number_id, from_phone, 193 "You are not authorized to use this bot. " 194 f"If you should be, ask the admin to add {from_phone} to the project's allowlist.", 195 ) 196 except Exception as e: 197 logger.warning(f"Failed to send unauthorized reply: {e}") 198 return 199 200 if msg_type != "text": 201 # MVP scope is text-only. Replying once keeps the user from 202 # waiting silently for an answer the agent will never produce. 203 logger.info(f"WhatsApp inbound type={msg_type!r} from {from_phone} — replying with text-only notice") 204 try: 205 send_message( 206 access_token, phone_number_id, from_phone, 207 "Sorry, I can only handle text messages right now.", 208 ) 209 except Exception as e: 210 logger.warning(f"Failed to send text-only notice: {e}") 211 return 212 213 text = (message.get("text") or {}).get("body") or "" 214 if not text.strip(): 215 return 216 217 logger.info(f"WhatsApp ← {from_phone} (project={project_id}): {text[:200]!r}") 218 219 try: 220 response_text = asyncio.run(_run_agent(project_id, text, from_phone)) 221 except Exception: 222 logger.exception(f"Agent dispatch failed for WhatsApp message (project={project_id})") 223 return 224 225 if not response_text: 226 logger.warning(f"WhatsApp agent returned empty response for project={project_id}") 227 return 228 229 try: 230 send_message(access_token, phone_number_id, from_phone, response_text) 231 except Exception as e: 232 logger.warning(f"WhatsApp send failed for project={project_id}: {e}") 233 234 235 async def _run_agent(project_id: int, text: str, from_phone: str) -> Optional[str]: 236 """Invoke the project's chat pipeline. Mirrors the Telegram cron's 237 helper — same `chat_id=f"<channel>_{user_id}"` convention so each 238 customer keeps a sticky conversation across messages.""" 239 from restai.brain import Brain 240 from restai.helper import chat_main 241 from fastapi import BackgroundTasks as _BG 242 243 brain = Brain(lightweight=True) 244 db = get_db_wrapper() 245 try: 246 project = brain.find_project(project_id, db) 247 if project is None: 248 logger.warning(f"_run_agent: project {project_id} not found") 249 return None 250 251 chat_input = ChatModel(question=text, id=f"whatsapp_{from_phone}") 252 253 user_db = db.get_user_by_username("admin") 254 if user_db is None: 255 logger.warning("_run_agent: no 'admin' user — cannot run agent") 256 return None 257 user = User.model_validate(user_db) 258 259 bg = _BG() 260 result = await chat_main(None, brain, project, chat_input, user, db, bg) 261 await bg() 262 263 if isinstance(result, dict): 264 return result.get("answer", "") or None 265 return None 266 finally: 267 db.db.close() 268 269 270 # ─── Admin: Test Connection ───────────────────────────────────────────── 271 @router.post("/projects/{projectID}/whatsapp/test") 272 async def test_whatsapp_connection( 273 projectID: int, 274 user: User = Depends(get_current_username_project), 275 db_wrapper: DBWrapper = Depends(get_db_wrapper), 276 ): 277 """Hits Meta's ``GET /{phone_number_id}`` to confirm the project's 278 credentials are valid without sending a real message. Surfaces the 279 display name + verified status to the admin's "Test Connection" 280 button on the project edit page.""" 281 proj = db_wrapper.get_project_by_id(projectID) 282 if proj is None: 283 raise HTTPException(status_code=404, detail="project not found") 284 opts = _project_options(proj) 285 access_token = decrypt_field(opts.get("whatsapp_access_token") or "") 286 phone_number_id = opts.get("whatsapp_phone_number_id") or "" 287 if not access_token or not phone_number_id: 288 return {"ok": False, "error": "WhatsApp is not configured for this project"} 289 return validate_token(access_token, phone_number_id)