project_lambdas.py
1 """Lambda constructs: receiver (SNS) + processor (DDB Stream with tumbling window). 2 3 Buffering pattern based on: 4 https://github.com/aws-samples/sample-whatsapp-end-user-messaging-connect-chat 5 """ 6 7 from constructs import Construct 8 from aws_cdk import ( 9 Duration, 10 aws_lambda as _lambda, 11 aws_lambda_event_sources as event_sources, 12 aws_sns_subscriptions as sns_subs, 13 aws_sns as sns, 14 aws_dynamodb as ddb, 15 aws_s3 as s3, 16 aws_iam as iam, 17 ) 18 19 20 class ProjectLambdas(Construct): 21 """Receiver Lambda (SNS) + Processor Lambda (DDB Stream tumbling window).""" 22 23 def __init__( 24 self, 25 scope: Construct, 26 construct_id: str, 27 topic: sns.Topic, 28 table: ddb.Table, 29 bucket: s3.Bucket, 30 agent_runtime_arn: str, 31 buffer_seconds: int = 20, 32 **kwargs, 33 ) -> None: 34 super().__init__(scope, construct_id, **kwargs) 35 36 # --- Receiver Lambda (SNS -> save to DDB) --- 37 self.whatsapp_handler = _lambda.Function( 38 self, 39 "WhatsAppHandler", 40 runtime=_lambda.Runtime.PYTHON_3_11, 41 handler="lambda_function.lambda_handler", 42 code=_lambda.Code.from_asset("lambdas/code/whatsapp_handler"), 43 timeout=Duration.minutes(5), 44 memory_size=512, 45 environment={ 46 "TABLE_NAME": table.table_name, 47 "S3_BUCKET": bucket.bucket_name, 48 "ATTACHMENT_PREFIX": "images/", 49 "VOICE_PREFIX": "voice/", 50 "VIDEO_PREFIX": "video/", 51 "DOCUMENT_PREFIX": "documents/", 52 }, 53 ) 54 55 topic.add_subscription(sns_subs.LambdaSubscription(self.whatsapp_handler)) 56 table.grant_read_write_data(self.whatsapp_handler) 57 bucket.grant_read_write(self.whatsapp_handler) 58 59 self.whatsapp_handler.add_to_role_policy( 60 iam.PolicyStatement( 61 actions=[ 62 "social-messaging:SendWhatsAppMessage", 63 "social-messaging:GetWhatsAppMessageMedia", 64 ], 65 resources=["*"], 66 ) 67 ) 68 69 # --- Processor Lambda (DDB Stream with tumbling window -> AgentCore) --- 70 self.message_processor = _lambda.Function( 71 self, 72 "MessageProcessor", 73 runtime=_lambda.Runtime.PYTHON_3_11, 74 handler="lambda_function.lambda_handler", 75 code=_lambda.Code.from_asset("lambdas/code/message_processor"), 76 timeout=Duration.minutes(5), 77 memory_size=512, 78 environment={ 79 "TABLE_NAME": table.table_name, 80 "AGENT_ARN": agent_runtime_arn, 81 "S3_BUCKET": bucket.bucket_name, 82 }, 83 ) 84 85 buffer_duration = Duration.seconds(buffer_seconds) 86 self.message_processor.add_event_source( 87 event_sources.DynamoEventSource( 88 table, 89 starting_position=_lambda.StartingPosition.TRIM_HORIZON, 90 tumbling_window=buffer_duration, 91 batch_size=1000, 92 max_batching_window=buffer_duration, 93 ) 94 ) 95 96 table.grant_read_data(self.message_processor) 97 bucket.grant_read(self.message_processor) 98 99 self.message_processor.add_to_role_policy( 100 iam.PolicyStatement( 101 actions=[ 102 "bedrock-agentcore:InvokeAgentRuntime", 103 "bedrock-agentcore:InvokeAgentRuntimeForUser", 104 ], 105 resources=["*"], 106 ) 107 ) 108 109 self.message_processor.add_to_role_policy( 110 iam.PolicyStatement( 111 actions=[ 112 "social-messaging:SendWhatsAppMessage", 113 ], 114 resources=["*"], 115 ) 116 ) 117 118 self.message_processor.add_to_role_policy( 119 iam.PolicyStatement( 120 actions=[ 121 "transcribe:StartTranscriptionJob", 122 "transcribe:GetTranscriptionJob", 123 ], 124 resources=["*"], 125 ) 126 )