lambda_function.py
1 """Receiver Lambda: save messages to DynamoDB for tumbling window aggregation. 2 3 Processes incoming WhatsApp messages from SNS. For each message: 4 1. Mark as read + reaction (immediate user feedback) 5 2. Download media to S3 (WhatsApp media URLs expire) 6 3. Save to DynamoDB (DDB Stream + tumbling window triggers the processor) 7 8 Buffering pattern based on: 9 https://github.com/aws-samples/sample-whatsapp-end-user-messaging-connect-chat 10 """ 11 12 import json 13 import logging 14 import os 15 import re 16 import time 17 18 import boto3 19 20 from whatsapp_service import WhatsAppService 21 22 logger = logging.getLogger() 23 logger.setLevel(logging.INFO) 24 25 TABLE_NAME = os.environ.get("TABLE_NAME", "") 26 S3_BUCKET = os.environ.get("S3_BUCKET", "") 27 28 dynamodb = boto3.resource("dynamodb") 29 table = dynamodb.Table(TABLE_NAME) if TABLE_NAME else None 30 31 SUPPORTED_TYPES = {"text", "image", "audio", "video", "document"} 32 33 34 def process_message(message): 35 """Save message to DynamoDB for tumbling window processing.""" 36 phone = message.phone_number 37 msg_type = message.get_message_type() 38 39 message.mark_as_read() 40 message.reaction("👍") 41 42 if msg_type not in SUPPORTED_TYPES: 43 message.text_reply( 44 "Message type not supported. Send text, image, audio, video, or document." 45 ) 46 return 47 48 item = { 49 "from_phone": phone, 50 "id": message.message_id, 51 "timestamp": message.message.get("timestamp", ""), 52 "ttl": int(time.time()) + 86400, 53 "type": msg_type, 54 "phone_number_id": message.phone_number_id, 55 "phone_number_arn": message.phone_number_arn, 56 "meta_api_version": message.meta_api_version, 57 } 58 59 if message.contact_name: 60 item["contact_name"] = message.contact_name 61 62 if msg_type == "text": 63 text = message.get_text() 64 if text: 65 item["text"] = text 66 67 elif msg_type == "image": 68 image = message.get_image(download=True) 69 caption = image.get("caption", "") 70 if caption: 71 item["caption"] = caption 72 if image.get("s3_url"): 73 item["media_ref"] = json.dumps({"type": "image", "s3_url": image["s3_url"]}) 74 75 elif msg_type == "audio": 76 audio = message.get_audio(download=True) 77 if audio.get("s3_url"): 78 item["media_ref"] = json.dumps({ 79 "type": "audio", "s3_url": audio["s3_url"], 80 "media_id": audio.get("media_id", ""), 81 }) 82 83 elif msg_type == "video": 84 video = message.get_video(download=True) 85 caption = video.get("caption", "") 86 if caption: 87 item["caption"] = caption 88 if video.get("s3_url"): 89 item["media_ref"] = json.dumps({"type": "video", "s3_url": video["s3_url"]}) 90 91 elif msg_type == "document": 92 doc = message.get_document(download=True) 93 caption = doc.get("caption", "") 94 if caption: 95 item["caption"] = caption 96 if doc.get("s3_url"): 97 raw_name = doc.get("filename", "document") 98 name_no_ext = raw_name.rsplit(".", 1)[0] if "." in raw_name else raw_name 99 filename = re.sub(r"[^a-zA-Z0-9\s\-\(\)\[\]]", " ", name_no_ext).strip() 100 filename = re.sub(r"\s+", " ", filename) or "document" 101 item["media_ref"] = json.dumps({ 102 "type": "document", "s3_url": doc["s3_url"], "filename": filename, 103 }) 104 105 table.put_item(Item=item) 106 logger.info("Saved message %s from %s (type=%s)", message.message_id, phone, msg_type) 107 108 109 def lambda_handler(event, context): 110 """Entry point: receives SNS events from AWS End User Messaging Social.""" 111 for record in event.get("Records", []): 112 try: 113 sns_message_str = record.get("Sns", {}).get("Message", "{}") 114 sns_message = json.loads(sns_message_str) 115 whatsapp = WhatsAppService(sns_message) 116 117 for message in whatsapp.messages: 118 process_message(message) 119 except Exception as e: 120 logger.error("Error processing record: %s", str(e), exc_info=True) 121 122 return {"statusCode": 200}