project_lambdas.py
1 """Lambda constructs: webhook receiver (API GW) + processor (DDB Stream with tumbling window). 2 3 Buffering pattern based on: 4 https://github.com/aws-samples/sample-whatsapp-end-user-messaging-connect-chat 5 """ 6 7 from constructs import Construct 8 from aws_cdk import ( 9 Duration, 10 aws_lambda as _lambda, 11 aws_lambda_event_sources as event_sources, 12 aws_iam as iam, 13 aws_dynamodb as ddb, 14 aws_s3 as s3, 15 ) 16 17 18 class ProjectLambdas(Construct): 19 """Webhook receiver (API GW) + Message processor (DDB Stream tumbling window).""" 20 21 def __init__( 22 self, 23 scope: Construct, 24 construct_id: str, 25 table: ddb.Table, 26 users_table: ddb.Table, 27 bucket: s3.Bucket, 28 common_layer: _lambda.LayerVersion, 29 secret_arn: str, 30 ig_secret_arn: str, 31 agent_runtime_arn: str, 32 buffer_seconds: int = 10, 33 **kwargs, 34 ) -> None: 35 super().__init__(scope, construct_id, **kwargs) 36 37 default_env = { 38 "TABLE_NAME": table.table_name, 39 "USERS_TABLE_NAME": users_table.table_name, 40 "S3_BUCKET": bucket.bucket_name, 41 } 42 43 # --- Webhook receiver: API GW -> save to DDB --- 44 self.webhook_receiver = _lambda.Function( 45 self, 46 "WebhookReceiver", 47 runtime=_lambda.Runtime.PYTHON_3_12, 48 handler="lambda_function.lambda_handler", 49 code=_lambda.Code.from_asset("lambdas/code/webhook_receiver"), 50 timeout=Duration.minutes(2), 51 memory_size=256, 52 layers=[common_layer], 53 environment={ 54 **default_env, 55 "SECRET_ARN": secret_arn, 56 "IG_SECRET_ARN": ig_secret_arn, 57 }, 58 ) 59 60 table.grant_read_write_data(self.webhook_receiver) 61 users_table.grant_read_write_data(self.webhook_receiver) 62 bucket.grant_read_write(self.webhook_receiver) 63 64 self.webhook_receiver.add_to_role_policy( 65 iam.PolicyStatement( 66 actions=["secretsmanager:GetSecretValue"], 67 resources=[secret_arn, ig_secret_arn], 68 ) 69 ) 70 71 # --- Message processor: DDB Stream with tumbling window -> AgentCore --- 72 self.message_processor = _lambda.Function( 73 self, 74 "MessageProcessor", 75 runtime=_lambda.Runtime.PYTHON_3_12, 76 handler="lambda_function.lambda_handler", 77 code=_lambda.Code.from_asset("lambdas/code/message_processor"), 78 timeout=Duration.minutes(5), 79 memory_size=512, 80 layers=[common_layer], 81 environment={ 82 **default_env, 83 "AGENT_ARN": agent_runtime_arn, 84 "IG_SECRET_ARN": ig_secret_arn, 85 }, 86 ) 87 88 buffer_duration = Duration.seconds(buffer_seconds) 89 self.message_processor.add_event_source( 90 event_sources.DynamoEventSource( 91 table, 92 starting_position=_lambda.StartingPosition.TRIM_HORIZON, 93 tumbling_window=buffer_duration, 94 batch_size=1000, 95 max_batching_window=buffer_duration, 96 ) 97 ) 98 99 table.grant_read_data(self.message_processor) 100 users_table.grant_read_write_data(self.message_processor) 101 bucket.grant_read_write(self.message_processor) 102 103 self.message_processor.add_to_role_policy( 104 iam.PolicyStatement( 105 actions=["secretsmanager:GetSecretValue"], 106 resources=[ig_secret_arn], 107 ) 108 ) 109 110 self.message_processor.add_to_role_policy( 111 iam.PolicyStatement( 112 actions=[ 113 "bedrock-agentcore:InvokeAgentRuntime", 114 "bedrock-agentcore:InvokeAgentRuntimeForUser", 115 ], 116 resources=["*"], 117 ) 118 ) 119 120 self.message_processor.add_to_role_policy( 121 iam.PolicyStatement( 122 actions=[ 123 "transcribe:StartTranscriptionJob", 124 "transcribe:GetTranscriptionJob", 125 ], 126 resources=["*"], 127 ) 128 )