/ 02-multichannel-api-gateway / lambdas / project_lambdas.py
project_lambdas.py
  1  """Lambda constructs: webhook receiver (API GW) + processor (DDB Stream with tumbling window).
  2  
  3  Buffering pattern based on:
  4  https://github.com/aws-samples/sample-whatsapp-end-user-messaging-connect-chat
  5  """
  6  
  7  from constructs import Construct
  8  from aws_cdk import (
  9      Duration,
 10      aws_lambda as _lambda,
 11      aws_lambda_event_sources as event_sources,
 12      aws_iam as iam,
 13      aws_dynamodb as ddb,
 14      aws_s3 as s3,
 15  )
 16  
 17  
 18  class ProjectLambdas(Construct):
 19      """Webhook receiver (API GW) + Message processor (DDB Stream tumbling window)."""
 20  
 21      def __init__(
 22          self,
 23          scope: Construct,
 24          construct_id: str,
 25          table: ddb.Table,
 26          users_table: ddb.Table,
 27          bucket: s3.Bucket,
 28          common_layer: _lambda.LayerVersion,
 29          secret_arn: str,
 30          ig_secret_arn: str,
 31          agent_runtime_arn: str,
 32          buffer_seconds: int = 10,
 33          **kwargs,
 34      ) -> None:
 35          super().__init__(scope, construct_id, **kwargs)
 36  
 37          default_env = {
 38              "TABLE_NAME": table.table_name,
 39              "USERS_TABLE_NAME": users_table.table_name,
 40              "S3_BUCKET": bucket.bucket_name,
 41          }
 42  
 43          # --- Webhook receiver: API GW -> save to DDB ---
 44          self.webhook_receiver = _lambda.Function(
 45              self,
 46              "WebhookReceiver",
 47              runtime=_lambda.Runtime.PYTHON_3_12,
 48              handler="lambda_function.lambda_handler",
 49              code=_lambda.Code.from_asset("lambdas/code/webhook_receiver"),
 50              timeout=Duration.minutes(2),
 51              memory_size=256,
 52              layers=[common_layer],
 53              environment={
 54                  **default_env,
 55                  "SECRET_ARN": secret_arn,
 56                  "IG_SECRET_ARN": ig_secret_arn,
 57              },
 58          )
 59  
 60          table.grant_read_write_data(self.webhook_receiver)
 61          users_table.grant_read_write_data(self.webhook_receiver)
 62          bucket.grant_read_write(self.webhook_receiver)
 63  
 64          self.webhook_receiver.add_to_role_policy(
 65              iam.PolicyStatement(
 66                  actions=["secretsmanager:GetSecretValue"],
 67                  resources=[secret_arn, ig_secret_arn],
 68              )
 69          )
 70  
 71          # --- Message processor: DDB Stream with tumbling window -> AgentCore ---
 72          self.message_processor = _lambda.Function(
 73              self,
 74              "MessageProcessor",
 75              runtime=_lambda.Runtime.PYTHON_3_12,
 76              handler="lambda_function.lambda_handler",
 77              code=_lambda.Code.from_asset("lambdas/code/message_processor"),
 78              timeout=Duration.minutes(5),
 79              memory_size=512,
 80              layers=[common_layer],
 81              environment={
 82                  **default_env,
 83                  "AGENT_ARN": agent_runtime_arn,
 84                  "IG_SECRET_ARN": ig_secret_arn,
 85              },
 86          )
 87  
 88          buffer_duration = Duration.seconds(buffer_seconds)
 89          self.message_processor.add_event_source(
 90              event_sources.DynamoEventSource(
 91                  table,
 92                  starting_position=_lambda.StartingPosition.TRIM_HORIZON,
 93                  tumbling_window=buffer_duration,
 94                  batch_size=1000,
 95                  max_batching_window=buffer_duration,
 96              )
 97          )
 98  
 99          table.grant_read_data(self.message_processor)
100          users_table.grant_read_write_data(self.message_processor)
101          bucket.grant_read_write(self.message_processor)
102  
103          self.message_processor.add_to_role_policy(
104              iam.PolicyStatement(
105                  actions=["secretsmanager:GetSecretValue"],
106                  resources=[ig_secret_arn],
107              )
108          )
109  
110          self.message_processor.add_to_role_policy(
111              iam.PolicyStatement(
112                  actions=[
113                      "bedrock-agentcore:InvokeAgentRuntime",
114                      "bedrock-agentcore:InvokeAgentRuntimeForUser",
115                  ],
116                  resources=["*"],
117              )
118          )
119  
120          self.message_processor.add_to_role_policy(
121              iam.PolicyStatement(
122                  actions=[
123                      "transcribe:StartTranscriptionJob",
124                      "transcribe:GetTranscriptionJob",
125                  ],
126                  resources=["*"],
127              )
128          )