/ restai / routers / whatsapp_webhook.py
whatsapp_webhook.py
  1  """WhatsApp Business Cloud API webhook.
  2  
  3  Single shared webhook URL routes inbound messages to projects by
  4  ``entry[0].changes[0].value.metadata.phone_number_id``. Each project's
  5  HMAC signature is verified against its own ``whatsapp_app_secret``, so
  6  multitenancy is kept honest by per-project secrets rather than by URL
  7  namespacing.
  8  
  9  Two endpoints under ``/webhooks/whatsapp``:
 10  
 11  * ``GET`` — Meta's subscription handshake. Echoes ``hub.challenge`` when
 12    ``hub.verify_token`` matches *any* project's stored verify token.
 13  * ``POST`` — inbound message delivery. Verifies the signature, looks up
 14    the project by phone-number id, runs the agent with
 15    ``chat_id=f"whatsapp_{from_phone}"``, and posts the reply back via
 16    Meta's Graph API. Always returns ``200`` within the request — heavy
 17    work goes to ``BackgroundTasks`` because Meta retries aggressively on
 18    any non-2xx (or any response taking more than ~10s).
 19  """
 20  from __future__ import annotations
 21  
 22  import asyncio
 23  import json
 24  import logging
 25  from typing import Optional
 26  
 27  from fastapi import APIRouter, BackgroundTasks, Depends, Request, Response, HTTPException, Query
 28  
 29  from restai.auth import get_current_username_project
 30  from restai.database import DBWrapper, get_db_wrapper
 31  from restai.models.databasemodels import ProjectDatabase
 32  from restai.models.models import ChatModel, User
 33  from restai.utils.crypto import decrypt_field
 34  from restai.whatsapp import send_message, verify_signature, validate_token
 35  
 36  logger = logging.getLogger(__name__)
 37  
 38  router = APIRouter()
 39  
 40  
 41  def _project_options(proj: ProjectDatabase) -> dict:
 42      try:
 43          return json.loads(proj.options) if proj.options else {}
 44      except Exception:
 45          return {}
 46  
 47  
 48  def _find_project_by_verify_token(db, verify_token: str) -> Optional[ProjectDatabase]:
 49      """Search every project for a matching whatsapp_verify_token. Slow
 50      O(N) but only runs during the one-time Meta subscription handshake."""
 51      for proj in db.db.query(ProjectDatabase).all():
 52          opts = _project_options(proj)
 53          token = decrypt_field(opts.get("whatsapp_verify_token") or "")
 54          if token and token == verify_token:
 55              return proj
 56      return None
 57  
 58  
 59  def _find_project_by_phone_id(db, phone_number_id: str) -> Optional[ProjectDatabase]:
 60      """Look up the project whose whatsapp_phone_number_id matches. The
 61      field is stored in plaintext (it's not a secret) so we can do a
 62      direct LIKE filter, then confirm in Python."""
 63      for proj in db.db.query(ProjectDatabase).all():
 64          opts = _project_options(proj)
 65          if (opts.get("whatsapp_phone_number_id") or "") == phone_number_id:
 66              return proj
 67      return None
 68  
 69  
 70  def _parse_allowlist(raw: str) -> set[str]:
 71      """Same shape as Telegram's allowlist parser, but kept as strings —
 72      WhatsApp ids are E.164 phone numbers, not ints."""
 73      out: set[str] = set()
 74      if not raw:
 75          return out
 76      for piece in raw.replace(";", ",").split(","):
 77          piece = piece.strip().lstrip("+")
 78          if piece:
 79              out.add(piece)
 80      return out
 81  
 82  
 83  @router.get("/webhooks/whatsapp")
 84  async def verify_webhook(
 85      request: Request,
 86      hub_mode: Optional[str] = Query(None, alias="hub.mode"),
 87      hub_challenge: Optional[str] = Query(None, alias="hub.challenge"),
 88      hub_verify_token: Optional[str] = Query(None, alias="hub.verify_token"),
 89  ):
 90      """Meta's webhook subscription handshake. We echo back ``hub.challenge``
 91      only when the supplied ``hub.verify_token`` matches one of the
 92      project-configured verify tokens. Returns 403 otherwise so Meta
 93      surfaces a clear failure to the admin in Business Suite."""
 94      if hub_mode != "subscribe" or not hub_challenge or not hub_verify_token:
 95          raise HTTPException(status_code=400, detail="missing required hub.* params")
 96  
 97      db = get_db_wrapper()
 98      try:
 99          proj = _find_project_by_verify_token(db, hub_verify_token)
100          if proj is None:
101              logger.warning("WhatsApp webhook verify failed — no project matched verify_token")
102              raise HTTPException(status_code=403, detail="verify_token mismatch")
103          logger.info(f"WhatsApp webhook verified for project '{proj.name}' (id={proj.id})")
104          return Response(content=hub_challenge, media_type="text/plain")
105      finally:
106          db.db.close()
107  
108  
109  @router.post("/webhooks/whatsapp")
110  async def receive_webhook(request: Request, background_tasks: BackgroundTasks):
111      """Receive inbound WhatsApp messages. Always returns 200 unless the
112      signature fails — Meta retries on non-2xx, which would cause
113      duplicate messages."""
114      raw = await request.body()
115      sig = request.headers.get("X-Hub-Signature-256") or request.headers.get("x-hub-signature-256")
116  
117      try:
118          payload = json.loads(raw or b"{}")
119      except Exception:
120          logger.warning("WhatsApp webhook received non-JSON body")
121          return {"status": "ignored", "reason": "invalid json"}
122  
123      if payload.get("object") != "whatsapp_business_account":
124          return {"status": "ignored", "reason": "unexpected object"}
125  
126      db = get_db_wrapper()
127      try:
128          for entry in payload.get("entry", []) or []:
129              for change in entry.get("changes", []) or []:
130                  value = change.get("value") or {}
131                  metadata = value.get("metadata") or {}
132                  phone_number_id = metadata.get("phone_number_id")
133                  if not phone_number_id:
134                      continue
135  
136                  proj = _find_project_by_phone_id(db, phone_number_id)
137                  if proj is None:
138                      logger.info(f"WhatsApp inbound for unknown phone_number_id={phone_number_id} — dropping")
139                      continue
140  
141                  opts = _project_options(proj)
142                  app_secret = decrypt_field(opts.get("whatsapp_app_secret") or "")
143                  access_token = decrypt_field(opts.get("whatsapp_access_token") or "")
144  
145                  if not verify_signature(raw, sig, app_secret):
146                      logger.warning(
147                          f"WhatsApp signature verification FAILED for project '{proj.name}' "
148                          f"(id={proj.id}) — possible attacker probe"
149                      )
150                      raise HTTPException(status_code=401, detail="signature mismatch")
151  
152                  if not access_token:
153                      logger.warning(f"WhatsApp project '{proj.name}' has no access_token — cannot reply")
154                      continue
155  
156                  allowed = _parse_allowlist(opts.get("whatsapp_allowed_phone_numbers") or "")
157  
158                  for message in value.get("messages", []) or []:
159                      background_tasks.add_task(
160                          _handle_message_safe,
161                          proj.id, phone_number_id, access_token, allowed, message,
162                      )
163  
164          return {"status": "ok"}
165      finally:
166          db.db.close()
167  
168  
169  def _handle_message_safe(project_id: int, phone_number_id: str, access_token: str,
170                            allowed: set[str], message: dict) -> None:
171      """Background task wrapper — never raises so a single bad message
172      can't poison the BackgroundTasks queue."""
173      try:
174          _handle_message(project_id, phone_number_id, access_token, allowed, message)
175      except Exception:
176          logger.exception(f"WhatsApp message handler crashed (project={project_id})")
177  
178  
179  def _handle_message(project_id: int, phone_number_id: str, access_token: str,
180                       allowed: set[str], message: dict) -> None:
181      msg_type = message.get("type")
182      from_phone = (message.get("from") or "").lstrip("+")
183      if not from_phone:
184          return
185  
186      # Allowlist gate — applies to all message types so the polite reply
187      # is consistent. Empty allowlist = open access.
188      if allowed and from_phone not in allowed:
189          logger.info(f"WhatsApp sender {from_phone} not in allowlist for project {project_id}")
190          try:
191              send_message(
192                  access_token, phone_number_id, from_phone,
193                  "You are not authorized to use this bot. "
194                  f"If you should be, ask the admin to add {from_phone} to the project's allowlist.",
195              )
196          except Exception as e:
197              logger.warning(f"Failed to send unauthorized reply: {e}")
198          return
199  
200      if msg_type != "text":
201          # MVP scope is text-only. Replying once keeps the user from
202          # waiting silently for an answer the agent will never produce.
203          logger.info(f"WhatsApp inbound type={msg_type!r} from {from_phone} — replying with text-only notice")
204          try:
205              send_message(
206                  access_token, phone_number_id, from_phone,
207                  "Sorry, I can only handle text messages right now.",
208              )
209          except Exception as e:
210              logger.warning(f"Failed to send text-only notice: {e}")
211          return
212  
213      text = (message.get("text") or {}).get("body") or ""
214      if not text.strip():
215          return
216  
217      logger.info(f"WhatsApp ← {from_phone} (project={project_id}): {text[:200]!r}")
218  
219      try:
220          response_text = asyncio.run(_run_agent(project_id, text, from_phone))
221      except Exception:
222          logger.exception(f"Agent dispatch failed for WhatsApp message (project={project_id})")
223          return
224  
225      if not response_text:
226          logger.warning(f"WhatsApp agent returned empty response for project={project_id}")
227          return
228  
229      try:
230          send_message(access_token, phone_number_id, from_phone, response_text)
231      except Exception as e:
232          logger.warning(f"WhatsApp send failed for project={project_id}: {e}")
233  
234  
235  async def _run_agent(project_id: int, text: str, from_phone: str) -> Optional[str]:
236      """Invoke the project's chat pipeline. Mirrors the Telegram cron's
237      helper — same `chat_id=f"<channel>_{user_id}"` convention so each
238      customer keeps a sticky conversation across messages."""
239      from restai.brain import Brain
240      from restai.helper import chat_main
241      from fastapi import BackgroundTasks as _BG
242  
243      brain = Brain(lightweight=True)
244      db = get_db_wrapper()
245      try:
246          project = brain.find_project(project_id, db)
247          if project is None:
248              logger.warning(f"_run_agent: project {project_id} not found")
249              return None
250  
251          chat_input = ChatModel(question=text, id=f"whatsapp_{from_phone}")
252  
253          user_db = db.get_user_by_username("admin")
254          if user_db is None:
255              logger.warning("_run_agent: no 'admin' user — cannot run agent")
256              return None
257          user = User.model_validate(user_db)
258  
259          bg = _BG()
260          result = await chat_main(None, brain, project, chat_input, user, db, bg)
261          await bg()
262  
263          if isinstance(result, dict):
264              return result.get("answer", "") or None
265          return None
266      finally:
267          db.db.close()
268  
269  
270  # ─── Admin: Test Connection ─────────────────────────────────────────────
271  @router.post("/projects/{projectID}/whatsapp/test")
272  async def test_whatsapp_connection(
273      projectID: int,
274      user: User = Depends(get_current_username_project),
275      db_wrapper: DBWrapper = Depends(get_db_wrapper),
276  ):
277      """Hits Meta's ``GET /{phone_number_id}`` to confirm the project's
278      credentials are valid without sending a real message. Surfaces the
279      display name + verified status to the admin's "Test Connection"
280      button on the project edit page."""
281      proj = db_wrapper.get_project_by_id(projectID)
282      if proj is None:
283          raise HTTPException(status_code=404, detail="project not found")
284      opts = _project_options(proj)
285      access_token = decrypt_field(opts.get("whatsapp_access_token") or "")
286      phone_number_id = opts.get("whatsapp_phone_number_id") or ""
287      if not access_token or not phone_number_id:
288          return {"ok": False, "error": "WhatsApp is not configured for this project"}
289      return validate_token(access_token, phone_number_id)