lambda_function.py
  1  """Receiver Lambda: save messages to DynamoDB for tumbling window aggregation.
  2  
  3  Processes incoming WhatsApp messages from SNS. For each message:
  4  1. Mark as read + reaction (immediate user feedback)
  5  2. Download media to S3 (WhatsApp media URLs expire)
  6  3. Save to DynamoDB (DDB Stream + tumbling window triggers the processor)
  7  
  8  Buffering pattern based on:
  9  https://github.com/aws-samples/sample-whatsapp-end-user-messaging-connect-chat
 10  """
 11  
 12  import json
 13  import logging
 14  import os
 15  import re
 16  import time
 17  
 18  import boto3
 19  
 20  from whatsapp_service import WhatsAppService
 21  
 22  logger = logging.getLogger()
 23  logger.setLevel(logging.INFO)
 24  
 25  TABLE_NAME = os.environ.get("TABLE_NAME", "")
 26  S3_BUCKET = os.environ.get("S3_BUCKET", "")
 27  
 28  dynamodb = boto3.resource("dynamodb")
 29  table = dynamodb.Table(TABLE_NAME) if TABLE_NAME else None
 30  
 31  SUPPORTED_TYPES = {"text", "image", "audio", "video", "document"}
 32  
 33  
 34  def process_message(message):
 35      """Save message to DynamoDB for tumbling window processing."""
 36      phone = message.phone_number
 37      msg_type = message.get_message_type()
 38  
 39      message.mark_as_read()
 40      message.reaction("👍")
 41  
 42      if msg_type not in SUPPORTED_TYPES:
 43          message.text_reply(
 44              "Message type not supported. Send text, image, audio, video, or document."
 45          )
 46          return
 47  
 48      item = {
 49          "from_phone": phone,
 50          "id": message.message_id,
 51          "timestamp": message.message.get("timestamp", ""),
 52          "ttl": int(time.time()) + 86400,
 53          "type": msg_type,
 54          "phone_number_id": message.phone_number_id,
 55          "phone_number_arn": message.phone_number_arn,
 56          "meta_api_version": message.meta_api_version,
 57      }
 58  
 59      if message.contact_name:
 60          item["contact_name"] = message.contact_name
 61  
 62      if msg_type == "text":
 63          text = message.get_text()
 64          if text:
 65              item["text"] = text
 66  
 67      elif msg_type == "image":
 68          image = message.get_image(download=True)
 69          caption = image.get("caption", "")
 70          if caption:
 71              item["caption"] = caption
 72          if image.get("s3_url"):
 73              item["media_ref"] = json.dumps({"type": "image", "s3_url": image["s3_url"]})
 74  
 75      elif msg_type == "audio":
 76          audio = message.get_audio(download=True)
 77          if audio.get("s3_url"):
 78              item["media_ref"] = json.dumps({
 79                  "type": "audio", "s3_url": audio["s3_url"],
 80                  "media_id": audio.get("media_id", ""),
 81              })
 82  
 83      elif msg_type == "video":
 84          video = message.get_video(download=True)
 85          caption = video.get("caption", "")
 86          if caption:
 87              item["caption"] = caption
 88          if video.get("s3_url"):
 89              item["media_ref"] = json.dumps({"type": "video", "s3_url": video["s3_url"]})
 90  
 91      elif msg_type == "document":
 92          doc = message.get_document(download=True)
 93          caption = doc.get("caption", "")
 94          if caption:
 95              item["caption"] = caption
 96          if doc.get("s3_url"):
 97              raw_name = doc.get("filename", "document")
 98              name_no_ext = raw_name.rsplit(".", 1)[0] if "." in raw_name else raw_name
 99              filename = re.sub(r"[^a-zA-Z0-9\s\-\(\)\[\]]", " ", name_no_ext).strip()
100              filename = re.sub(r"\s+", " ", filename) or "document"
101              item["media_ref"] = json.dumps({
102                  "type": "document", "s3_url": doc["s3_url"], "filename": filename,
103              })
104  
105      table.put_item(Item=item)
106      logger.info("Saved message %s from %s (type=%s)", message.message_id, phone, msg_type)
107  
108  
109  def lambda_handler(event, context):
110      """Entry point: receives SNS events from AWS End User Messaging Social."""
111      for record in event.get("Records", []):
112          try:
113              sns_message_str = record.get("Sns", {}).get("Message", "{}")
114              sns_message = json.loads(sns_message_str)
115              whatsapp = WhatsAppService(sns_message)
116  
117              for message in whatsapp.messages:
118                  process_message(message)
119          except Exception as e:
120              logger.error("Error processing record: %s", str(e), exc_info=True)
121  
122      return {"statusCode": 200}