project_lambdas.py
  1  """Lambda constructs: receiver (SNS) + processor (DDB Stream with tumbling window).
  2  
  3  Buffering pattern based on:
  4  https://github.com/aws-samples/sample-whatsapp-end-user-messaging-connect-chat
  5  """
  6  
  7  from constructs import Construct
  8  from aws_cdk import (
  9      Duration,
 10      aws_lambda as _lambda,
 11      aws_lambda_event_sources as event_sources,
 12      aws_sns_subscriptions as sns_subs,
 13      aws_sns as sns,
 14      aws_dynamodb as ddb,
 15      aws_s3 as s3,
 16      aws_iam as iam,
 17  )
 18  
 19  
 20  class ProjectLambdas(Construct):
 21      """Receiver Lambda (SNS) + Processor Lambda (DDB Stream tumbling window)."""
 22  
 23      def __init__(
 24          self,
 25          scope: Construct,
 26          construct_id: str,
 27          topic: sns.Topic,
 28          table: ddb.Table,
 29          bucket: s3.Bucket,
 30          agent_runtime_arn: str,
 31          buffer_seconds: int = 20,
 32          **kwargs,
 33      ) -> None:
 34          super().__init__(scope, construct_id, **kwargs)
 35  
 36          # --- Receiver Lambda (SNS -> save to DDB) ---
 37          self.whatsapp_handler = _lambda.Function(
 38              self,
 39              "WhatsAppHandler",
 40              runtime=_lambda.Runtime.PYTHON_3_11,
 41              handler="lambda_function.lambda_handler",
 42              code=_lambda.Code.from_asset("lambdas/code/whatsapp_handler"),
 43              timeout=Duration.minutes(5),
 44              memory_size=512,
 45              environment={
 46                  "TABLE_NAME": table.table_name,
 47                  "S3_BUCKET": bucket.bucket_name,
 48                  "ATTACHMENT_PREFIX": "images/",
 49                  "VOICE_PREFIX": "voice/",
 50                  "VIDEO_PREFIX": "video/",
 51                  "DOCUMENT_PREFIX": "documents/",
 52              },
 53          )
 54  
 55          topic.add_subscription(sns_subs.LambdaSubscription(self.whatsapp_handler))
 56          table.grant_read_write_data(self.whatsapp_handler)
 57          bucket.grant_read_write(self.whatsapp_handler)
 58  
 59          self.whatsapp_handler.add_to_role_policy(
 60              iam.PolicyStatement(
 61                  actions=[
 62                      "social-messaging:SendWhatsAppMessage",
 63                      "social-messaging:GetWhatsAppMessageMedia",
 64                  ],
 65                  resources=["*"],
 66              )
 67          )
 68  
 69          # --- Processor Lambda (DDB Stream with tumbling window -> AgentCore) ---
 70          self.message_processor = _lambda.Function(
 71              self,
 72              "MessageProcessor",
 73              runtime=_lambda.Runtime.PYTHON_3_11,
 74              handler="lambda_function.lambda_handler",
 75              code=_lambda.Code.from_asset("lambdas/code/message_processor"),
 76              timeout=Duration.minutes(5),
 77              memory_size=512,
 78              environment={
 79                  "TABLE_NAME": table.table_name,
 80                  "AGENT_ARN": agent_runtime_arn,
 81                  "S3_BUCKET": bucket.bucket_name,
 82              },
 83          )
 84  
 85          buffer_duration = Duration.seconds(buffer_seconds)
 86          self.message_processor.add_event_source(
 87              event_sources.DynamoEventSource(
 88                  table,
 89                  starting_position=_lambda.StartingPosition.TRIM_HORIZON,
 90                  tumbling_window=buffer_duration,
 91                  batch_size=1000,
 92                  max_batching_window=buffer_duration,
 93              )
 94          )
 95  
 96          table.grant_read_data(self.message_processor)
 97          bucket.grant_read(self.message_processor)
 98  
 99          self.message_processor.add_to_role_policy(
100              iam.PolicyStatement(
101                  actions=[
102                      "bedrock-agentcore:InvokeAgentRuntime",
103                      "bedrock-agentcore:InvokeAgentRuntimeForUser",
104                  ],
105                  resources=["*"],
106              )
107          )
108  
109          self.message_processor.add_to_role_policy(
110              iam.PolicyStatement(
111                  actions=[
112                      "social-messaging:SendWhatsAppMessage",
113                  ],
114                  resources=["*"],
115              )
116          )
117  
118          self.message_processor.add_to_role_policy(
119              iam.PolicyStatement(
120                  actions=[
121                      "transcribe:StartTranscriptionJob",
122                      "transcribe:GetTranscriptionJob",
123                  ],
124                  resources=["*"],
125              )
126          )