agentcore_service.py
 1  """Service for invoking AgentCore Runtime."""
 2  
 3  import json
 4  import logging
 5  
 6  import boto3
 7  
 8  logger = logging.getLogger(__name__)
 9  
10  
11  class AgentCoreService:
12      """Handles communication with AgentCore Runtime."""
13  
14      def __init__(self, agent_arn: str):
15          self.client = boto3.client("bedrock-agentcore")
16          self.agent_arn = agent_arn
17  
18      @staticmethod
19      def _generate_actor_id(from_phone: str) -> str:
20          """Generate actor ID identifying the USER (min 33 chars).
21  
22          Identifies who the user IS. Used for long-term memory (facts,
23          preferences) that persists across all sessions.
24          Based only on the user's phone number.
25          """
26          actor = f"wa-user-{from_phone}"
27          return actor.ljust(33, "0")
28  
29      @staticmethod
30      def _generate_session_id(from_phone: str) -> str:
31          """Generate session ID for the user's conversation (min 33 chars).
32  
33          Identifies the conversation thread. Used for short-term memory
34          (conversation turns) that expires per the configured TTL.
35          One session per user — WhatsApp is a single continuous thread.
36          """
37          session = f"wa-chat-{from_phone}"
38          return session.ljust(33, "0")
39  
40      def invoke_agent(
41          self,
42          from_phone: str,
43          prompt: str,
44          media: dict = None,
45      ) -> str:
46          """Invoke AgentCore Runtime with text and optional media.
47  
48          Args:
49              from_phone: Sender's phone number.
50              prompt: Text prompt from the user.
51              media: Optional dict with keys 'type', 'format', 'data'.
52  
53          Returns:
54              Agent response text.
55          """
56          actor_id = self._generate_actor_id(from_phone)
57          session_id = self._generate_session_id(from_phone)
58  
59          payload_data = {
60              "prompt": prompt.strip(),
61              "actor_id": actor_id,
62          }
63  
64          if media:
65              payload_data["media"] = media
66  
67          payload = json.dumps(payload_data).encode()
68  
69          logger.info(
70              "Invoking AgentCore: actor=%s, session=%s, has_media=%s",
71              actor_id,
72              session_id,
73              media is not None,
74          )
75  
76          response = self.client.invoke_agent_runtime(
77              agentRuntimeArn=self.agent_arn,
78              runtimeSessionId=session_id,
79              runtimeUserId=actor_id,
80              payload=payload,
81          )
82  
83          content = []
84          for chunk in response.get("response", []):
85              if isinstance(chunk, bytes):
86                  content.append(chunk.decode("utf-8"))
87              elif isinstance(chunk, dict) and "bytes" in chunk:
88                  content.append(chunk["bytes"].decode("utf-8"))
89  
90          response_text = "".join(content)
91  
92          try:
93              response_json = json.loads(response_text)
94              return response_json.get("result", response_text)
95          except json.JSONDecodeError:
96              return response_text