databases.py
1 """DynamoDB tables for message buffering and unified user identity. 2 3 Messages table: from_phone as PK so messages from the same user land in the 4 same shard and are processed together by the tumbling window. 5 6 Users table: canonical user_id as PK with GSIs on wa_phone and ig_id for 7 cross-channel identity resolution. Enables shared AgentCore Memory across 8 WhatsApp and Instagram for the same person. 9 10 Buffering pattern based on: 11 https://github.com/aws-samples/sample-whatsapp-end-user-messaging-connect-chat 12 """ 13 14 from constructs import Construct 15 from aws_cdk import ( 16 RemovalPolicy, 17 aws_dynamodb as ddb, 18 ) 19 20 21 class MessageDatabase(Construct): 22 """DynamoDB table with stream for tumbling window message aggregation.""" 23 24 def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: 25 super().__init__(scope, construct_id, **kwargs) 26 27 self.table = ddb.Table( 28 self, 29 "MessagesTable", 30 partition_key=ddb.Attribute(name="from_phone", type=ddb.AttributeType.STRING), 31 sort_key=ddb.Attribute(name="id", type=ddb.AttributeType.STRING), 32 billing_mode=ddb.BillingMode.PAY_PER_REQUEST, 33 removal_policy=RemovalPolicy.DESTROY, 34 stream=ddb.StreamViewType.NEW_IMAGE, 35 time_to_live_attribute="ttl", 36 ) 37 38 39 class UserIdentityDatabase(Construct): 40 """Unified user identity table for cross-channel memory. 41 42 Maps WhatsApp phone numbers and Instagram IDs to a single canonical 43 user_id used as actor_id in AgentCore Memory. When the same person 44 messages from both WhatsApp and Instagram, their conversations share 45 the same long-term memory. 46 """ 47 48 def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: 49 super().__init__(scope, construct_id, **kwargs) 50 51 self.table = ddb.Table( 52 self, 53 "UnifiedUsersTable", 54 partition_key=ddb.Attribute(name="user_id", type=ddb.AttributeType.STRING), 55 billing_mode=ddb.BillingMode.PAY_PER_REQUEST, 56 removal_policy=RemovalPolicy.DESTROY, 57 ) 58 59 # GSI to look up by WhatsApp phone number 60 self.table.add_global_secondary_index( 61 index_name="wa-phone-index", 62 partition_key=ddb.Attribute(name="wa_phone", type=ddb.AttributeType.STRING), 63 ) 64 65 # GSI to look up by Instagram scoped ID 66 self.table.add_global_secondary_index( 67 index_name="ig-id-index", 68 partition_key=ddb.Attribute(name="ig_id", type=ddb.AttributeType.STRING), 69 )