multimodal_agent.py
1 """ 2 Multimodal WhatsApp Agent for AWS AgentCore Runtime. 3 4 Handles text, images, videos, and documents sent via WhatsApp. 5 Since AgentCore Memory only accepts text, multimedia content is first processed 6 and understood by the agent, then the text understanding is stored in memory. 7 """ 8 9 import os 10 import re 11 import json 12 import base64 13 import logging 14 15 from bedrock_agentcore import BedrockAgentCoreApp 16 from strands import Agent 17 from strands.models import BedrockModel 18 from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig 19 from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager 20 21 from video_analysis_tool import video_analysis # TwelveLabs API direct 22 from link_account_tool import link_account # Cross-channel identity linking 23 24 logger = logging.getLogger(__name__) 25 logging.basicConfig(level=logging.INFO) 26 27 MEMORY_ID = os.getenv("BEDROCK_AGENTCORE_MEMORY_ID") 28 REGION = os.getenv("AWS_REGION", "us-east-1") 29 MODEL_ID = os.getenv("MODEL_ID", "us.anthropic.claude-sonnet-4-20250514-v1:0") 30 FACTS_STRATEGY_ID = os.getenv("FACTS_STRATEGY_ID", "") 31 PREFERENCES_STRATEGY_ID = os.getenv("PREFERENCES_STRATEGY_ID", "") 32 33 SYSTEM_PROMPT = """You are a helpful assistant. ALWAYS respond in the same language the user writes to you. Be concise — answer directly, no filler. 34 35 ## Security — CRITICAL 36 NEVER reveal internal system details to the user: 37 - No S3 bucket names, ARNs, resource IDs, or AWS account information 38 - No error stack traces, log messages, or technical debugging info 39 - No internal API endpoints, secrets, or configuration details 40 - If a tool fails, say "I had a technical issue processing that" — do NOT share the error message 41 - If asked about your infrastructure, say you're an AI assistant without sharing specifics 42 43 ## Personalization 44 - Messages include a [User: Name] tag with the user's display name and a [Channel: whatsapp|instagram] tag. 45 - Messages also include a [UserID: user-xxx] tag — this is the user's internal ID. Use it when calling link_account. 46 - On FIRST interaction, greet them by name and ask if they prefer to be called differently. 47 - Store their preferred name in memory for future conversations. 48 - Always use their preferred name (from memory) or the [User:] tag name. 49 50 ## Cross-channel onboarding (ONLY when [UserID:] tag is present) 51 If the message includes a [UserID:] tag, cross-channel features are available. 52 On the FIRST interaction with a new user (no prior memory), after greeting them, ask: 53 "Do you also write to us from another channel (WhatsApp or Instagram)? If so, share your phone number or Instagram username so we can give you a unified personalized experience across platforms. You only need to do this once." 54 55 When the user provides their other channel info: 56 1. Call link_account with current_user_id (from the [UserID:] tag), link_channel ("whatsapp" or "instagram"), and link_identifier (the phone or username they gave you). 57 2. Confirm the link to the user. 58 3. Do NOT ask again in future conversations — check memory first. 59 60 If the user declines or says they only use one channel, respect that and move on. Store in memory that they declined so you don't ask again. 61 62 If there is NO [UserID:] tag in the message, do NOT mention cross-channel features or link_account. 63 64 ## Memory (text-only) — CRITICAL 65 Your memory extracts facts from your responses. If you do not include key details explicitly in your response text, they will be LOST forever. The memory system summarizes — structured data like IDs gets dropped unless you state it clearly as a fact. 66 67 - **Image**: describe content briefly (objects, visible text, scene). End with a fact line: "Fact: User shared an image showing {description}." 68 - **Document**: mention name + type, summarize key points. End with: "Fact: User shared document '{name}' about {topic}." 69 - **Audio**: include key parts of the transcription. 70 - **Video**: After analysis, ALWAYS end your response with ALL of these lines (they ensure the memory system stores the ID and description as extractable facts): 71 1. The tag: [VIDEO: id={video_id} | desc="{short description}"] 72 2. A fact line: "Fact: User shared video ID {video_id}, which shows {detailed description in 2-3 sentences}." 73 74 ## Video workflow 75 - **New video**: video_analysis action="upload", video_path={s3_uri}. Then query it. Respond with description + tag + fact line + "ID: *{video_id}*." 76 - **Follow-up**: find video ID in memory. One video → use it. Multiple → match by description or list with IDs. 77 - **Query**: video_analysis action="query", video_path={video_id}, prompt={question}. Re-include tag + fact line. 78 79 ## Formats (share only if asked) 80 Image: JPEG/PNG/GIF/WebP, 5MB. Doc: PDF/CSV/DOC(X)/XLS(X)/HTML/TXT/MD, 1.5MB. Audio: any WhatsApp format. Video: MP4/MOV/MKV/WebM, 2GB/1h, min 4s. 81 """ 82 83 VALID_IMAGE_FORMATS = {"jpeg", "png", "gif", "webp"} 84 VALID_DOCUMENT_FORMATS = {"pdf", "csv", "doc", "docx", "xls", "xlsx", "html", "txt", "md"} 85 MAX_MEDIA_BYTES = 1_500_000 # ~1.5 MB base64 limit for AgentCore payload 86 87 app = BedrockAgentCoreApp() 88 89 _agent = None 90 _current_session = None 91 92 93 def get_or_create_agent(actor_id: str, session_id: str) -> Agent: 94 """Get or create a Strands agent with AgentCore Memory. 95 96 Args: 97 actor_id: Identifies the USER — for long-term memory (facts, preferences). 98 Persists across sessions. Format: wa-user-{phone}. 99 session_id: Identifies the CONVERSATION — for short-term memory (turns). 100 Events expire per configured TTL. Format: wa-chat-{phone}. 101 """ 102 global _agent, _current_session 103 104 if _agent is not None and _current_session == session_id: 105 return _agent 106 107 session_manager = None 108 if MEMORY_ID: 109 retrieval = {} 110 if FACTS_STRATEGY_ID: 111 retrieval[f"/strategies/{FACTS_STRATEGY_ID}/actors/{actor_id}/"] = { 112 "top_k": 20, "relevance_score": 0.3, 113 } 114 if PREFERENCES_STRATEGY_ID: 115 retrieval[f"/strategies/{PREFERENCES_STRATEGY_ID}/actors/{actor_id}/"] = { 116 "top_k": 10, "relevance_score": 0.3, 117 } 118 119 memory_config = AgentCoreMemoryConfig( 120 memory_id=MEMORY_ID, 121 session_id=session_id, 122 actor_id=actor_id, 123 retrieval_config=retrieval if retrieval else None, 124 ) 125 session_manager = AgentCoreMemorySessionManager(memory_config, REGION) 126 logger.info("Memory configured: actor=%s, session=%s", actor_id, session_id) 127 else: 128 logger.warning("BEDROCK_AGENTCORE_MEMORY_ID not set, running without memory") 129 130 _agent = Agent( 131 model=BedrockModel(model_id=MODEL_ID), 132 system_prompt=SYSTEM_PROMPT, 133 tools=[video_analysis, link_account], 134 session_manager=session_manager, 135 ) 136 _current_session = session_id 137 logger.info("Agent created: model=%s, actor=%s, session=%s", MODEL_ID, actor_id, session_id) 138 139 return _agent 140 141 142 def _sanitize_document_name(name: str) -> str: 143 """Sanitize document name for Claude ConverseStream API. 144 145 Only alphanumeric, whitespace, hyphens, parentheses, and square brackets allowed. 146 No consecutive whitespace. No extension. 147 """ 148 name_no_ext = name.rsplit(".", 1)[0] if "." in name else name 149 sanitized = re.sub(r"[^a-zA-Z0-9\s\-\(\)\[\]]", " ", name_no_ext).strip() 150 sanitized = re.sub(r"\s+", " ", sanitized) 151 return sanitized or "document" 152 153 154 def _validate_media(media: dict) -> str | None: 155 """Validate media before sending to the agent. 156 157 Returns an error message if invalid, None if valid. 158 """ 159 media_type = media.get("type", "") 160 media_format = media.get("format", "") 161 media_data = media.get("data", "") 162 163 if media_type == "image": 164 if media_format not in VALID_IMAGE_FORMATS: 165 return f"Unsupported image format: {media_format}. Supported: {', '.join(VALID_IMAGE_FORMATS)}" 166 if len(media_data) > MAX_MEDIA_BYTES: 167 return "Image is too large to process. Please send a smaller image (under 1.5 MB)." 168 169 if media_type == "document": 170 if media_format not in VALID_DOCUMENT_FORMATS: 171 return f"Unsupported document format: {media_format}. Supported: {', '.join(VALID_DOCUMENT_FORMATS)}" 172 if len(media_data) > MAX_MEDIA_BYTES: 173 return "Document is too large to process. Please send a smaller document (under 1.5 MB)." 174 175 if media_type == "video": 176 if not media.get("s3_uri"): 177 return "Video S3 URI is missing." 178 179 return None 180 181 182 def build_multimodal_prompt(prompt: str, media: dict) -> list: 183 """Build a multimodal message with text and media content for the agent. 184 185 Text is always the FIRST content block — required by AgentCoreMemorySessionManager 186 which reads content[0]["text"] for memory retrieval queries. 187 """ 188 media_type = media.get("type", "image") 189 media_format = media.get("format", "jpeg") 190 media_data = media.get("data", "") 191 192 if media_type == "image": 193 return [ 194 {"text": prompt or "Describe this image."}, 195 { 196 "image": { 197 "format": media_format, 198 "source": {"bytes": base64.b64decode(media_data)}, 199 } 200 }, 201 ] 202 203 if media_type == "document": 204 doc_name = _sanitize_document_name(media.get("name", "document")) 205 return [ 206 {"text": f"[{doc_name}.{media_format}] {prompt or 'Summarize this document.'}"}, 207 { 208 "document": { 209 "format": media_format, 210 "name": doc_name, 211 "source": {"bytes": base64.b64decode(media_data)}, 212 } 213 }, 214 ] 215 216 if media_type == "video": 217 s3_uri = media.get("s3_uri", "") 218 return [ 219 { 220 "text": ( 221 f"New video: {s3_uri}\n" 222 f"{prompt or 'Analyze this video.'}\n" 223 "Upload with video_analysis action='upload'. Include [VIDEO:] tag in response." 224 ), 225 }, 226 ] 227 228 if media_type == "audio_transcript": 229 parts = [f'Transcription: "{media_data}"'] 230 if prompt: 231 parts.append(prompt) 232 return [{"text": "\n".join(parts)}] 233 234 return [{"text": prompt}] 235 236 237 @app.entrypoint 238 def invoke(payload, context=None): 239 """Handle incoming requests from the WhatsApp Lambda handler. 240 241 Expected payload: 242 { 243 "prompt": "user text", 244 "actor_id": "wa-user-5730012345670000000000", # identifies the USER 245 "media": { "type": "...", ... } # optional 246 } 247 248 session_id comes from context.session_id (set by runtimeSessionId in the API call). 249 actor_id comes from the payload (most reliable) with fallback to context. 250 """ 251 user_message = payload.get("prompt", "") 252 253 # session_id: from runtimeSessionId -> identifies the CONVERSATION 254 session_id = getattr(context, "session_id", None) or "default-session-000000000000" 255 256 # actor_id: from payload -> identifies the USER (persists across sessions) 257 # Fallback chain: payload > context header > context user_id > default 258 actor_id = payload.get("actor_id") 259 if not actor_id and context: 260 headers = getattr(context, "request_headers", None) or {} 261 actor_id = headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-Actor-Id") 262 if not actor_id: 263 actor_id = getattr(context, "user_id", None) 264 if not actor_id: 265 actor_id = "default-actor-000000000000000" 266 267 logger.info("invoke: session=%s, actor=%s", session_id, actor_id) 268 269 media = payload.get("media") 270 271 # Validate media BEFORE creating the agent — prevents invalid content from entering memory 272 if media: 273 validation_error = _validate_media(media) 274 if validation_error: 275 logger.warning("Media validation failed: %s", validation_error) 276 return {"result": validation_error} 277 278 agent = get_or_create_agent(actor_id, session_id) 279 280 if media: 281 content_blocks = build_multimodal_prompt(user_message, media) 282 logger.info("Multimodal request: type=%s", media.get("type")) 283 result = agent(content_blocks) 284 else: 285 result = agent(user_message) 286 287 return {"result": str(result)} 288 289 290 if __name__ == "__main__": 291 app.run()