agentcore_service.py
1 """Service for invoking AgentCore Runtime.""" 2 3 import json 4 import logging 5 6 import boto3 7 8 logger = logging.getLogger(__name__) 9 10 11 class AgentCoreService: 12 """Handles communication with AgentCore Runtime.""" 13 14 def __init__(self, agent_arn: str): 15 self.client = boto3.client("bedrock-agentcore") 16 self.agent_arn = agent_arn 17 18 @staticmethod 19 def _generate_actor_id(from_phone: str) -> str: 20 """Generate actor ID identifying the USER (min 33 chars). 21 22 Identifies who the user IS. Used for long-term memory (facts, 23 preferences) that persists across all sessions. 24 Based only on the user's phone number. 25 """ 26 actor = f"wa-user-{from_phone}" 27 return actor.ljust(33, "0") 28 29 @staticmethod 30 def _generate_session_id(from_phone: str) -> str: 31 """Generate session ID for the user's conversation (min 33 chars). 32 33 Identifies the conversation thread. Used for short-term memory 34 (conversation turns) that expires per the configured TTL. 35 One session per user — WhatsApp is a single continuous thread. 36 """ 37 session = f"wa-chat-{from_phone}" 38 return session.ljust(33, "0") 39 40 def invoke_agent( 41 self, 42 from_phone: str, 43 prompt: str, 44 media: dict = None, 45 ) -> str: 46 """Invoke AgentCore Runtime with text and optional media. 47 48 Args: 49 from_phone: Sender's phone number. 50 prompt: Text prompt from the user. 51 media: Optional dict with keys 'type', 'format', 'data'. 52 53 Returns: 54 Agent response text. 55 """ 56 actor_id = self._generate_actor_id(from_phone) 57 session_id = self._generate_session_id(from_phone) 58 59 payload_data = { 60 "prompt": prompt.strip(), 61 "actor_id": actor_id, 62 } 63 64 if media: 65 payload_data["media"] = media 66 67 payload = json.dumps(payload_data).encode() 68 69 logger.info( 70 "Invoking AgentCore: actor=%s, session=%s, has_media=%s", 71 actor_id, 72 session_id, 73 media is not None, 74 ) 75 76 response = self.client.invoke_agent_runtime( 77 agentRuntimeArn=self.agent_arn, 78 runtimeSessionId=session_id, 79 runtimeUserId=actor_id, 80 payload=payload, 81 ) 82 83 content = [] 84 for chunk in response.get("response", []): 85 if isinstance(chunk, bytes): 86 content.append(chunk.decode("utf-8")) 87 elif isinstance(chunk, dict) and "bytes" in chunk: 88 content.append(chunk["bytes"].decode("utf-8")) 89 90 response_text = "".join(content) 91 92 try: 93 response_json = json.loads(response_text) 94 return response_json.get("result", response_text) 95 except json.JSONDecodeError: 96 return response_text