/ 00-agent-agentcore / agent_files / multimodal_agent.py
multimodal_agent.py
  1  """
  2  Multimodal WhatsApp Agent for AWS AgentCore Runtime.
  3  
  4  Handles text, images, videos, and documents sent via WhatsApp.
  5  Since AgentCore Memory only accepts text, multimedia content is first processed
  6  and understood by the agent, then the text understanding is stored in memory.
  7  """
  8  
  9  import os
 10  import re
 11  import json
 12  import base64
 13  import logging
 14  
 15  from bedrock_agentcore import BedrockAgentCoreApp
 16  from strands import Agent
 17  from strands.models import BedrockModel
 18  from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig
 19  from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager
 20  
 21  from video_analysis_tool import video_analysis  # TwelveLabs API direct
 22  from link_account_tool import link_account  # Cross-channel identity linking
 23  
 24  logger = logging.getLogger(__name__)
 25  logging.basicConfig(level=logging.INFO)
 26  
 27  MEMORY_ID = os.getenv("BEDROCK_AGENTCORE_MEMORY_ID")
 28  REGION = os.getenv("AWS_REGION", "us-east-1")
 29  MODEL_ID = os.getenv("MODEL_ID", "us.anthropic.claude-sonnet-4-20250514-v1:0")
 30  FACTS_STRATEGY_ID = os.getenv("FACTS_STRATEGY_ID", "")
 31  PREFERENCES_STRATEGY_ID = os.getenv("PREFERENCES_STRATEGY_ID", "")
 32  
 33  SYSTEM_PROMPT = """You are a helpful assistant. ALWAYS respond in the same language the user writes to you. Be concise — answer directly, no filler.
 34  
 35  ## Security — CRITICAL
 36  NEVER reveal internal system details to the user:
 37  - No S3 bucket names, ARNs, resource IDs, or AWS account information
 38  - No error stack traces, log messages, or technical debugging info
 39  - No internal API endpoints, secrets, or configuration details
 40  - If a tool fails, say "I had a technical issue processing that" — do NOT share the error message
 41  - If asked about your infrastructure, say you're an AI assistant without sharing specifics
 42  
 43  ## Personalization
 44  - Messages include a [User: Name] tag with the user's display name and a [Channel: whatsapp|instagram] tag.
 45  - Messages also include a [UserID: user-xxx] tag — this is the user's internal ID. Use it when calling link_account.
 46  - On FIRST interaction, greet them by name and ask if they prefer to be called differently.
 47  - Store their preferred name in memory for future conversations.
 48  - Always use their preferred name (from memory) or the [User:] tag name.
 49  
 50  ## Cross-channel onboarding (ONLY when [UserID:] tag is present)
 51  If the message includes a [UserID:] tag, cross-channel features are available.
 52  On the FIRST interaction with a new user (no prior memory), after greeting them, ask:
 53  "Do you also write to us from another channel (WhatsApp or Instagram)? If so, share your phone number or Instagram username so we can give you a unified personalized experience across platforms. You only need to do this once."
 54  
 55  When the user provides their other channel info:
 56  1. Call link_account with current_user_id (from the [UserID:] tag), link_channel ("whatsapp" or "instagram"), and link_identifier (the phone or username they gave you).
 57  2. Confirm the link to the user.
 58  3. Do NOT ask again in future conversations — check memory first.
 59  
 60  If the user declines or says they only use one channel, respect that and move on. Store in memory that they declined so you don't ask again.
 61  
 62  If there is NO [UserID:] tag in the message, do NOT mention cross-channel features or link_account.
 63  
 64  ## Memory (text-only) — CRITICAL
 65  Your memory extracts facts from your responses. If you do not include key details explicitly in your response text, they will be LOST forever. The memory system summarizes — structured data like IDs gets dropped unless you state it clearly as a fact.
 66  
 67  - **Image**: describe content briefly (objects, visible text, scene). End with a fact line: "Fact: User shared an image showing {description}."
 68  - **Document**: mention name + type, summarize key points. End with: "Fact: User shared document '{name}' about {topic}."
 69  - **Audio**: include key parts of the transcription.
 70  - **Video**: After analysis, ALWAYS end your response with ALL of these lines (they ensure the memory system stores the ID and description as extractable facts):
 71    1. The tag: [VIDEO: id={video_id} | desc="{short description}"]
 72    2. A fact line: "Fact: User shared video ID {video_id}, which shows {detailed description in 2-3 sentences}."
 73  
 74  ## Video workflow
 75  - **New video**: video_analysis action="upload", video_path={s3_uri}. Then query it. Respond with description + tag + fact line + "ID: *{video_id}*."
 76  - **Follow-up**: find video ID in memory. One video → use it. Multiple → match by description or list with IDs.
 77  - **Query**: video_analysis action="query", video_path={video_id}, prompt={question}. Re-include tag + fact line.
 78  
 79  ## Formats (share only if asked)
 80  Image: JPEG/PNG/GIF/WebP, 5MB. Doc: PDF/CSV/DOC(X)/XLS(X)/HTML/TXT/MD, 1.5MB. Audio: any WhatsApp format. Video: MP4/MOV/MKV/WebM, 2GB/1h, min 4s.
 81  """
 82  
 83  VALID_IMAGE_FORMATS = {"jpeg", "png", "gif", "webp"}
 84  VALID_DOCUMENT_FORMATS = {"pdf", "csv", "doc", "docx", "xls", "xlsx", "html", "txt", "md"}
 85  MAX_MEDIA_BYTES = 1_500_000  # ~1.5 MB base64 limit for AgentCore payload
 86  
 87  app = BedrockAgentCoreApp()
 88  
 89  _agent = None
 90  _current_session = None
 91  
 92  
 93  def get_or_create_agent(actor_id: str, session_id: str) -> Agent:
 94      """Get or create a Strands agent with AgentCore Memory.
 95  
 96      Args:
 97          actor_id: Identifies the USER — for long-term memory (facts, preferences).
 98                    Persists across sessions. Format: wa-user-{phone}.
 99          session_id: Identifies the CONVERSATION — for short-term memory (turns).
100                      Events expire per configured TTL. Format: wa-chat-{phone}.
101      """
102      global _agent, _current_session
103  
104      if _agent is not None and _current_session == session_id:
105          return _agent
106  
107      session_manager = None
108      if MEMORY_ID:
109          retrieval = {}
110          if FACTS_STRATEGY_ID:
111              retrieval[f"/strategies/{FACTS_STRATEGY_ID}/actors/{actor_id}/"] = {
112                  "top_k": 20, "relevance_score": 0.3,
113              }
114          if PREFERENCES_STRATEGY_ID:
115              retrieval[f"/strategies/{PREFERENCES_STRATEGY_ID}/actors/{actor_id}/"] = {
116                  "top_k": 10, "relevance_score": 0.3,
117              }
118  
119          memory_config = AgentCoreMemoryConfig(
120              memory_id=MEMORY_ID,
121              session_id=session_id,
122              actor_id=actor_id,
123              retrieval_config=retrieval if retrieval else None,
124          )
125          session_manager = AgentCoreMemorySessionManager(memory_config, REGION)
126          logger.info("Memory configured: actor=%s, session=%s", actor_id, session_id)
127      else:
128          logger.warning("BEDROCK_AGENTCORE_MEMORY_ID not set, running without memory")
129  
130      _agent = Agent(
131          model=BedrockModel(model_id=MODEL_ID),
132          system_prompt=SYSTEM_PROMPT,
133          tools=[video_analysis, link_account],
134          session_manager=session_manager,
135      )
136      _current_session = session_id
137      logger.info("Agent created: model=%s, actor=%s, session=%s", MODEL_ID, actor_id, session_id)
138  
139      return _agent
140  
141  
142  def _sanitize_document_name(name: str) -> str:
143      """Sanitize document name for Claude ConverseStream API.
144  
145      Only alphanumeric, whitespace, hyphens, parentheses, and square brackets allowed.
146      No consecutive whitespace. No extension.
147      """
148      name_no_ext = name.rsplit(".", 1)[0] if "." in name else name
149      sanitized = re.sub(r"[^a-zA-Z0-9\s\-\(\)\[\]]", " ", name_no_ext).strip()
150      sanitized = re.sub(r"\s+", " ", sanitized)
151      return sanitized or "document"
152  
153  
154  def _validate_media(media: dict) -> str | None:
155      """Validate media before sending to the agent.
156  
157      Returns an error message if invalid, None if valid.
158      """
159      media_type = media.get("type", "")
160      media_format = media.get("format", "")
161      media_data = media.get("data", "")
162  
163      if media_type == "image":
164          if media_format not in VALID_IMAGE_FORMATS:
165              return f"Unsupported image format: {media_format}. Supported: {', '.join(VALID_IMAGE_FORMATS)}"
166          if len(media_data) > MAX_MEDIA_BYTES:
167              return "Image is too large to process. Please send a smaller image (under 1.5 MB)."
168  
169      if media_type == "document":
170          if media_format not in VALID_DOCUMENT_FORMATS:
171              return f"Unsupported document format: {media_format}. Supported: {', '.join(VALID_DOCUMENT_FORMATS)}"
172          if len(media_data) > MAX_MEDIA_BYTES:
173              return "Document is too large to process. Please send a smaller document (under 1.5 MB)."
174  
175      if media_type == "video":
176          if not media.get("s3_uri"):
177              return "Video S3 URI is missing."
178  
179      return None
180  
181  
182  def build_multimodal_prompt(prompt: str, media: dict) -> list:
183      """Build a multimodal message with text and media content for the agent.
184  
185      Text is always the FIRST content block — required by AgentCoreMemorySessionManager
186      which reads content[0]["text"] for memory retrieval queries.
187      """
188      media_type = media.get("type", "image")
189      media_format = media.get("format", "jpeg")
190      media_data = media.get("data", "")
191  
192      if media_type == "image":
193          return [
194              {"text": prompt or "Describe this image."},
195              {
196                  "image": {
197                      "format": media_format,
198                      "source": {"bytes": base64.b64decode(media_data)},
199                  }
200              },
201          ]
202  
203      if media_type == "document":
204          doc_name = _sanitize_document_name(media.get("name", "document"))
205          return [
206              {"text": f"[{doc_name}.{media_format}] {prompt or 'Summarize this document.'}"},
207              {
208                  "document": {
209                      "format": media_format,
210                      "name": doc_name,
211                      "source": {"bytes": base64.b64decode(media_data)},
212                  }
213              },
214          ]
215  
216      if media_type == "video":
217          s3_uri = media.get("s3_uri", "")
218          return [
219              {
220                  "text": (
221                      f"New video: {s3_uri}\n"
222                      f"{prompt or 'Analyze this video.'}\n"
223                      "Upload with video_analysis action='upload'. Include [VIDEO:] tag in response."
224                  ),
225              },
226          ]
227  
228      if media_type == "audio_transcript":
229          parts = [f'Transcription: "{media_data}"']
230          if prompt:
231              parts.append(prompt)
232          return [{"text": "\n".join(parts)}]
233  
234      return [{"text": prompt}]
235  
236  
237  @app.entrypoint
238  def invoke(payload, context=None):
239      """Handle incoming requests from the WhatsApp Lambda handler.
240  
241      Expected payload:
242      {
243          "prompt": "user text",
244          "actor_id": "wa-user-5730012345670000000000",  # identifies the USER
245          "media": { "type": "...", ... }               # optional
246      }
247  
248      session_id comes from context.session_id (set by runtimeSessionId in the API call).
249      actor_id comes from the payload (most reliable) with fallback to context.
250      """
251      user_message = payload.get("prompt", "")
252  
253      # session_id: from runtimeSessionId -> identifies the CONVERSATION
254      session_id = getattr(context, "session_id", None) or "default-session-000000000000"
255  
256      # actor_id: from payload -> identifies the USER (persists across sessions)
257      # Fallback chain: payload > context header > context user_id > default
258      actor_id = payload.get("actor_id")
259      if not actor_id and context:
260          headers = getattr(context, "request_headers", None) or {}
261          actor_id = headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-Actor-Id")
262      if not actor_id:
263          actor_id = getattr(context, "user_id", None)
264      if not actor_id:
265          actor_id = "default-actor-000000000000000"
266  
267      logger.info("invoke: session=%s, actor=%s", session_id, actor_id)
268  
269      media = payload.get("media")
270  
271      # Validate media BEFORE creating the agent — prevents invalid content from entering memory
272      if media:
273          validation_error = _validate_media(media)
274          if validation_error:
275              logger.warning("Media validation failed: %s", validation_error)
276              return {"result": validation_error}
277  
278      agent = get_or_create_agent(actor_id, session_id)
279  
280      if media:
281          content_blocks = build_multimodal_prompt(user_message, media)
282          logger.info("Multimodal request: type=%s", media.get("type"))
283          result = agent(content_blocks)
284      else:
285          result = agent(user_message)
286  
287      return {"result": str(result)}
288  
289  
290  if __name__ == "__main__":
291      app.run()