whatsapp_service.py
  1  """WhatsApp message handling via AWS End User Messaging Social."""
  2  
  3  import json
  4  import logging
  5  import os
  6  
  7  import boto3
  8  
  9  logger = logging.getLogger(__name__)
 10  
 11  BUCKET_NAME = os.environ.get("S3_BUCKET", "")
 12  ATTACHMENT_PREFIX = os.environ.get("ATTACHMENT_PREFIX", "attachment_")
 13  VOICE_PREFIX = os.environ.get("VOICE_PREFIX", "voice_")
 14  VIDEO_PREFIX = os.environ.get("VIDEO_PREFIX", "video_")
 15  DOCUMENT_PREFIX = os.environ.get("DOCUMENT_PREFIX", "document_")
 16  
 17  
 18  class WhatsAppMessage:
 19      """Represents a single WhatsApp message."""
 20  
 21      def __init__(self, meta_phone_number, message, metadata=None, client=None, meta_api_version="v20.0", contact_name=""):
 22          self.meta_phone_number = meta_phone_number
 23          self.phone_number_arn = meta_phone_number.get("arn", "")
 24          # phone-number-id-976c72a700aac43eaf573ae050example
 25          self.phone_number_id = self.phone_number_arn.split(":")[-1].replace("/", "-")
 26          self.message = message
 27          self.metadata = metadata or {}
 28          self.client = client or boto3.client("socialmessaging")
 29          self.s3_client = boto3.client("s3")
 30          self.phone_number = message.get("from", "")
 31          self.meta_api_version = meta_api_version
 32          self.message_id = message.get("id", "")
 33          self.contact_name = contact_name
 34  
 35      def get_text(self) -> str:
 36          return self.message.get("text", {}).get("body", "")
 37  
 38      def get_message_type(self) -> str:
 39          return self.message.get("type", "text")
 40  
 41      def get_image(self, download=True) -> dict:
 42          """Get image message data, optionally downloading to S3."""
 43          image = self.message.get("image")
 44          if not image:
 45              return {}
 46          result = {
 47              "media_id": image.get("id", ""),
 48              "mime_type": image.get("mime_type", ""),
 49              "caption": image.get("caption", ""),
 50          }
 51          if download and result["media_id"]:
 52              media = self._download_media(result["media_id"], ATTACHMENT_PREFIX)
 53              result.update(media)
 54          return result
 55  
 56      def get_audio(self, download=True) -> dict:
 57          """Get audio message data, optionally downloading to S3."""
 58          audio = self.message.get("audio")
 59          if not audio:
 60              return {}
 61          result = {
 62              "media_id": audio.get("id", ""),
 63              "mime_type": audio.get("mime_type", ""),
 64          }
 65          if download and result["media_id"]:
 66              media = self._download_media(result["media_id"], VOICE_PREFIX)
 67              result.update(media)
 68          return result
 69  
 70      def get_video(self, download=True) -> dict:
 71          """Get video message data, optionally downloading to S3."""
 72          video = self.message.get("video")
 73          if not video:
 74              return {}
 75          result = {
 76              "media_id": video.get("id", ""),
 77              "mime_type": video.get("mime_type", ""),
 78              "caption": video.get("caption", ""),
 79          }
 80          if download and result["media_id"]:
 81              media = self._download_media(result["media_id"], VIDEO_PREFIX)
 82              result.update(media)
 83          return result
 84  
 85      def get_document(self, download=True) -> dict:
 86          """Get document message data, optionally downloading to S3."""
 87          document = self.message.get("document")
 88          if not document:
 89              return {}
 90          result = {
 91              "media_id": document.get("id", ""),
 92              "mime_type": document.get("mime_type", ""),
 93              "filename": document.get("filename", ""),
 94              "caption": document.get("caption", ""),
 95          }
 96          if download and result["media_id"]:
 97              media = self._download_media(result["media_id"], DOCUMENT_PREFIX)
 98              result.update(media)
 99          return result
100  
101      def _download_media(self, media_id: str, prefix: str) -> dict:
102          """Download media from WhatsApp to S3 via social-messaging API."""
103          try:
104              response = self.client.get_whatsapp_message_media(
105                  mediaId=media_id,
106                  originationPhoneNumberId=self.phone_number_id,
107                  destinationS3File={
108                      "bucketName": BUCKET_NAME,
109                      "key": prefix,
110                  },
111              )
112              extension = response.get("mimeType", "").split("/")[-1]
113              location = f"s3://{BUCKET_NAME}/{prefix}{media_id}.{extension}"
114              logger.info("Media downloaded to: %s", location)
115              return {"s3_url": location, "s3_bucket": BUCKET_NAME, "s3_key": f"{prefix}{media_id}.{extension}"}
116          except Exception as e:
117              logger.error("Failed to download media %s: %s", media_id, str(e))
118              return {}
119  
120      def text_reply(self, text_message: str):
121          """Send a text reply to the user."""
122          message_object = {
123              "messaging_product": "whatsapp",
124              "recipient_type": "individual",
125              "context": {"message_id": self.message_id},
126              "to": f"+{self.phone_number}",
127              "type": "text",
128              "text": {"preview_url": False, "body": text_message},
129          }
130          try:
131              self.client.send_whatsapp_message(
132                  originationPhoneNumberId=self.phone_number_id,
133                  metaApiVersion=self.meta_api_version,
134                  message=bytes(json.dumps(message_object), "utf-8"),
135              )
136          except Exception as e:
137              logger.error("Failed to send reply: %s", str(e))
138  
139      def mark_as_read(self):
140          """Mark the message as read."""
141          status_object = {
142              "messaging_product": "whatsapp",
143              "message_id": self.message_id,
144              "status": "read",
145          }
146          try:
147              self.client.send_whatsapp_message(
148                  originationPhoneNumberId=self.phone_number_arn,
149                  metaApiVersion=self.meta_api_version,
150                  message=bytes(json.dumps(status_object), "utf-8"),
151              )
152          except Exception as e:
153              logger.error("Failed to mark as read: %s", str(e))
154  
155      def reaction(self, emoji: str):
156          """Send a reaction emoji to the message."""
157          reaction_object = {
158              "messaging_product": "whatsapp",
159              "recipient_type": "individual",
160              "to": f"+{self.phone_number}",
161              "type": "reaction",
162              "reaction": {"message_id": self.message_id, "emoji": emoji},
163          }
164          try:
165              self.client.send_whatsapp_message(
166                  originationPhoneNumberId=self.phone_number_arn,
167                  metaApiVersion=self.meta_api_version,
168                  message=bytes(json.dumps(reaction_object), "utf-8"),
169              )
170          except Exception as e:
171              logger.error("Failed to send reaction: %s", str(e))
172  
173  class WhatsAppService:
174      """Parses SNS events from AWS End User Messaging Social."""
175  
176      def __init__(self, sns_message: dict):
177          self.messages = []
178          self.context = sns_message.get("context", {})
179          self.meta_phone_number_ids = self.context.get("MetaPhoneNumberIds", [])
180  
181          webhook_entry = sns_message.get("whatsAppWebhookEntry", {})
182          if isinstance(webhook_entry, str):
183              webhook_entry = json.loads(webhook_entry)
184  
185          for change in webhook_entry.get("changes", []):
186              if change.get("field") != "messages":
187                  continue
188              value = change.get("value", {})
189              metadata = value.get("metadata", {})
190              phone_number_id = metadata.get("phone_number_id", "")
191              phone_number = self._get_phone_number_arn(phone_number_id)
192              contacts = value.get("contacts", [])
193              contact_name = contacts[0].get("profile", {}).get("name", "") if contacts else ""
194              for message in value.get("messages", []):
195                  self.messages.append(
196                      WhatsAppMessage(phone_number, message, metadata, contact_name=contact_name)
197                  )
198  
199      def _get_phone_number_arn(self, phone_number_id: str) -> dict:
200          """Find the phone number ARN from the SNS context metadata."""
201          for phone_number in self.meta_phone_number_ids:
202              if phone_number.get("metaPhoneNumberId") == phone_number_id:
203                  return phone_number
204          return {}