whatsapp_service.py
1 """WhatsApp message handling via AWS End User Messaging Social.""" 2 3 import json 4 import logging 5 import os 6 7 import boto3 8 9 logger = logging.getLogger(__name__) 10 11 BUCKET_NAME = os.environ.get("S3_BUCKET", "") 12 ATTACHMENT_PREFIX = os.environ.get("ATTACHMENT_PREFIX", "attachment_") 13 VOICE_PREFIX = os.environ.get("VOICE_PREFIX", "voice_") 14 VIDEO_PREFIX = os.environ.get("VIDEO_PREFIX", "video_") 15 DOCUMENT_PREFIX = os.environ.get("DOCUMENT_PREFIX", "document_") 16 17 18 class WhatsAppMessage: 19 """Represents a single WhatsApp message.""" 20 21 def __init__(self, meta_phone_number, message, metadata=None, client=None, meta_api_version="v20.0", contact_name=""): 22 self.meta_phone_number = meta_phone_number 23 self.phone_number_arn = meta_phone_number.get("arn", "") 24 # phone-number-id-976c72a700aac43eaf573ae050example 25 self.phone_number_id = self.phone_number_arn.split(":")[-1].replace("/", "-") 26 self.message = message 27 self.metadata = metadata or {} 28 self.client = client or boto3.client("socialmessaging") 29 self.s3_client = boto3.client("s3") 30 self.phone_number = message.get("from", "") 31 self.meta_api_version = meta_api_version 32 self.message_id = message.get("id", "") 33 self.contact_name = contact_name 34 35 def get_text(self) -> str: 36 return self.message.get("text", {}).get("body", "") 37 38 def get_message_type(self) -> str: 39 return self.message.get("type", "text") 40 41 def get_image(self, download=True) -> dict: 42 """Get image message data, optionally downloading to S3.""" 43 image = self.message.get("image") 44 if not image: 45 return {} 46 result = { 47 "media_id": image.get("id", ""), 48 "mime_type": image.get("mime_type", ""), 49 "caption": image.get("caption", ""), 50 } 51 if download and result["media_id"]: 52 media = self._download_media(result["media_id"], ATTACHMENT_PREFIX) 53 result.update(media) 54 return result 55 56 def get_audio(self, download=True) -> dict: 57 """Get audio message data, optionally downloading to S3.""" 58 audio = self.message.get("audio") 59 if not audio: 60 return {} 61 result = { 62 "media_id": audio.get("id", ""), 63 "mime_type": audio.get("mime_type", ""), 64 } 65 if download and result["media_id"]: 66 media = self._download_media(result["media_id"], VOICE_PREFIX) 67 result.update(media) 68 return result 69 70 def get_video(self, download=True) -> dict: 71 """Get video message data, optionally downloading to S3.""" 72 video = self.message.get("video") 73 if not video: 74 return {} 75 result = { 76 "media_id": video.get("id", ""), 77 "mime_type": video.get("mime_type", ""), 78 "caption": video.get("caption", ""), 79 } 80 if download and result["media_id"]: 81 media = self._download_media(result["media_id"], VIDEO_PREFIX) 82 result.update(media) 83 return result 84 85 def get_document(self, download=True) -> dict: 86 """Get document message data, optionally downloading to S3.""" 87 document = self.message.get("document") 88 if not document: 89 return {} 90 result = { 91 "media_id": document.get("id", ""), 92 "mime_type": document.get("mime_type", ""), 93 "filename": document.get("filename", ""), 94 "caption": document.get("caption", ""), 95 } 96 if download and result["media_id"]: 97 media = self._download_media(result["media_id"], DOCUMENT_PREFIX) 98 result.update(media) 99 return result 100 101 def _download_media(self, media_id: str, prefix: str) -> dict: 102 """Download media from WhatsApp to S3 via social-messaging API.""" 103 try: 104 response = self.client.get_whatsapp_message_media( 105 mediaId=media_id, 106 originationPhoneNumberId=self.phone_number_id, 107 destinationS3File={ 108 "bucketName": BUCKET_NAME, 109 "key": prefix, 110 }, 111 ) 112 extension = response.get("mimeType", "").split("/")[-1] 113 location = f"s3://{BUCKET_NAME}/{prefix}{media_id}.{extension}" 114 logger.info("Media downloaded to: %s", location) 115 return {"s3_url": location, "s3_bucket": BUCKET_NAME, "s3_key": f"{prefix}{media_id}.{extension}"} 116 except Exception as e: 117 logger.error("Failed to download media %s: %s", media_id, str(e)) 118 return {} 119 120 def text_reply(self, text_message: str): 121 """Send a text reply to the user.""" 122 message_object = { 123 "messaging_product": "whatsapp", 124 "recipient_type": "individual", 125 "context": {"message_id": self.message_id}, 126 "to": f"+{self.phone_number}", 127 "type": "text", 128 "text": {"preview_url": False, "body": text_message}, 129 } 130 try: 131 self.client.send_whatsapp_message( 132 originationPhoneNumberId=self.phone_number_id, 133 metaApiVersion=self.meta_api_version, 134 message=bytes(json.dumps(message_object), "utf-8"), 135 ) 136 except Exception as e: 137 logger.error("Failed to send reply: %s", str(e)) 138 139 def mark_as_read(self): 140 """Mark the message as read.""" 141 status_object = { 142 "messaging_product": "whatsapp", 143 "message_id": self.message_id, 144 "status": "read", 145 } 146 try: 147 self.client.send_whatsapp_message( 148 originationPhoneNumberId=self.phone_number_arn, 149 metaApiVersion=self.meta_api_version, 150 message=bytes(json.dumps(status_object), "utf-8"), 151 ) 152 except Exception as e: 153 logger.error("Failed to mark as read: %s", str(e)) 154 155 def reaction(self, emoji: str): 156 """Send a reaction emoji to the message.""" 157 reaction_object = { 158 "messaging_product": "whatsapp", 159 "recipient_type": "individual", 160 "to": f"+{self.phone_number}", 161 "type": "reaction", 162 "reaction": {"message_id": self.message_id, "emoji": emoji}, 163 } 164 try: 165 self.client.send_whatsapp_message( 166 originationPhoneNumberId=self.phone_number_arn, 167 metaApiVersion=self.meta_api_version, 168 message=bytes(json.dumps(reaction_object), "utf-8"), 169 ) 170 except Exception as e: 171 logger.error("Failed to send reaction: %s", str(e)) 172 173 class WhatsAppService: 174 """Parses SNS events from AWS End User Messaging Social.""" 175 176 def __init__(self, sns_message: dict): 177 self.messages = [] 178 self.context = sns_message.get("context", {}) 179 self.meta_phone_number_ids = self.context.get("MetaPhoneNumberIds", []) 180 181 webhook_entry = sns_message.get("whatsAppWebhookEntry", {}) 182 if isinstance(webhook_entry, str): 183 webhook_entry = json.loads(webhook_entry) 184 185 for change in webhook_entry.get("changes", []): 186 if change.get("field") != "messages": 187 continue 188 value = change.get("value", {}) 189 metadata = value.get("metadata", {}) 190 phone_number_id = metadata.get("phone_number_id", "") 191 phone_number = self._get_phone_number_arn(phone_number_id) 192 contacts = value.get("contacts", []) 193 contact_name = contacts[0].get("profile", {}).get("name", "") if contacts else "" 194 for message in value.get("messages", []): 195 self.messages.append( 196 WhatsAppMessage(phone_number, message, metadata, contact_name=contact_name) 197 ) 198 199 def _get_phone_number_arn(self, phone_number_id: str) -> dict: 200 """Find the phone number ARN from the SNS context metadata.""" 201 for phone_number in self.meta_phone_number_ids: 202 if phone_number.get("metaPhoneNumberId") == phone_number_id: 203 return phone_number 204 return {}