databases.py
 1  """DynamoDB tables for message buffering and unified user identity.
 2  
 3  Messages table: from_phone as PK so messages from the same user land in the
 4  same shard and are processed together by the tumbling window.
 5  
 6  Users table: canonical user_id as PK with GSIs on wa_phone and ig_id for
 7  cross-channel identity resolution. Enables shared AgentCore Memory across
 8  WhatsApp and Instagram for the same person.
 9  
10  Buffering pattern based on:
11  https://github.com/aws-samples/sample-whatsapp-end-user-messaging-connect-chat
12  """
13  
14  from constructs import Construct
15  from aws_cdk import (
16      RemovalPolicy,
17      aws_dynamodb as ddb,
18  )
19  
20  
21  class MessageDatabase(Construct):
22      """DynamoDB table with stream for tumbling window message aggregation."""
23  
24      def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
25          super().__init__(scope, construct_id, **kwargs)
26  
27          self.table = ddb.Table(
28              self,
29              "MessagesTable",
30              partition_key=ddb.Attribute(name="from_phone", type=ddb.AttributeType.STRING),
31              sort_key=ddb.Attribute(name="id", type=ddb.AttributeType.STRING),
32              billing_mode=ddb.BillingMode.PAY_PER_REQUEST,
33              removal_policy=RemovalPolicy.DESTROY,
34              stream=ddb.StreamViewType.NEW_IMAGE,
35              time_to_live_attribute="ttl",
36          )
37  
38  
39  class UserIdentityDatabase(Construct):
40      """Unified user identity table for cross-channel memory.
41  
42      Maps WhatsApp phone numbers and Instagram IDs to a single canonical
43      user_id used as actor_id in AgentCore Memory. When the same person
44      messages from both WhatsApp and Instagram, their conversations share
45      the same long-term memory.
46      """
47  
48      def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
49          super().__init__(scope, construct_id, **kwargs)
50  
51          self.table = ddb.Table(
52              self,
53              "UnifiedUsersTable",
54              partition_key=ddb.Attribute(name="user_id", type=ddb.AttributeType.STRING),
55              billing_mode=ddb.BillingMode.PAY_PER_REQUEST,
56              removal_policy=RemovalPolicy.DESTROY,
57          )
58  
59          # GSI to look up by WhatsApp phone number
60          self.table.add_global_secondary_index(
61              index_name="wa-phone-index",
62              partition_key=ddb.Attribute(name="wa_phone", type=ddb.AttributeType.STRING),
63          )
64  
65          # GSI to look up by Instagram scoped ID
66          self.table.add_global_secondary_index(
67              index_name="ig-id-index",
68              partition_key=ddb.Attribute(name="ig_id", type=ddb.AttributeType.STRING),
69          )