Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: New batch processing option: 'ThrowOnFullBatchFailure' #646

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 86 additions & 73 deletions docs/snippets/batch/templates/dynamodb.yaml
Original file line number Diff line number Diff line change
@@ -1,105 +1,118 @@
AWSTemplateFormatVersion: '2010-09-09'
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: partial batch response sample
Description: Example project demoing DynamoDB Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET)

Globals:
Function:
Timeout: 5
MemorySize: 256
Runtime: nodejs18.x
Tracing: Active
Timeout: 20
Runtime: dotnet8
MemorySize: 1024
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing
POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-ddb
POWERTOOLS_LOG_LEVEL: Debug
POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase
POWERTOOLS_LOGGER_CASE: PascalCase
POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent
POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1
POWERTOOLS_BATCH_PARALLEL_ENABLED: false

Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./src/HelloWorld/
Handler: HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttribute
Policies:
# Lambda Destinations require additional permissions
# to send failure records from Kinesis/DynamoDB
- Version: '2012-10-17'
Statement:
Effect: 'Allow'
Action:
- sqs:GetQueueAttributes
- sqs:GetQueueUrl
- sqs:SendMessage
Resource: !GetAtt SampleDLQ.Arn
- KMSDecryptPolicy:
KeyId: !Ref CustomerKey
Events:
DynamoDBStream:
Type: DynamoDB
Properties:
Stream: !GetAtt SampleTable.StreamArn
StartingPosition: LATEST
MaximumRetryAttempts: 2
DestinationConfig:
OnFailure:
Destination: !GetAtt SampleDLQ.Arn
FunctionResponseTypes:
- ReportBatchItemFailures
POWERTOOLS_BATCH_PARALLEL_ENABLED : false
POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true

SampleDLQ:
Type: AWS::SQS::Queue
Properties:
KmsMasterKeyId: !Ref CustomerKey
Resources:

SampleTable:
Type: AWS::DynamoDB::Table
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
SSESpecification:
SSEEnabled: true
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES

# --------------
# KMS key for encrypted queues
# KMS key for encrypted messages / records
CustomerKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypted queues
Enabled: true
KeyPolicy:
Version: '2012-10-17'
Version: "2012-10-17"
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: Allow use of the key
AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
Action: "kms:*"
Resource: "*"
- Sid: Allow AWS Lambda to use the key
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: '*'
Resource: "*"

CustomerKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: alias/powertools-batch-sqs-demo
TargetKeyId: !Ref CustomerKey
AliasName: !Sub alias/${AWS::StackName}-kms-key
TargetKeyId: !Ref CustomerKey

# --------------
# Batch Processing for DynamoDb (DDB) Stream
DdbStreamDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
KmsMasterKeyId: !Ref CustomerKey

DdbTable:
Type: AWS::DynamoDB::Table
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES

DdbStreamBatchProcessorFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./src/HelloWorld/
Handler: HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttribute
Policies:
- AWSLambdaDynamoDBExecutionRole
- Statement:
- Sid: DlqPermissions
Effect: Allow
Action:
- sqs:SendMessage
- sqs:SendMessageBatch
Resource: !GetAtt DdbStreamDeadLetterQueue.Arn
- Sid: KmsKeyPermissions
Effect: Allow
Action:
- kms:GenerateDataKey
Resource: !GetAtt CustomerKey.Arn
Events:
Stream:
Type: DynamoDB
Properties:
BatchSize: 5
BisectBatchOnFunctionError: true
DestinationConfig:
OnFailure:
Destination: !GetAtt DdbStreamDeadLetterQueue.Arn
Enabled: true
FunctionResponseTypes:
- ReportBatchItemFailures
MaximumRetryAttempts: 2
ParallelizationFactor: 1
StartingPosition: LATEST
Stream: !GetAtt DdbTable.StreamArn

DdbStreamBatchProcessorFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/${DdbStreamBatchProcessorFunction}"
RetentionInDays: 7

Outputs:
DdbTableName:
Description: "DynamoDB Table Name"
Value: !Ref DdbTable
154 changes: 92 additions & 62 deletions docs/snippets/batch/templates/kinesis.yaml
Original file line number Diff line number Diff line change
@@ -1,95 +1,125 @@
AWSTemplateFormatVersion: '2010-09-09'
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: partial batch response sample
Description: Example project demoing Kinesis Data Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET)

Globals:
Function:
Timeout: 5
MemorySize: 256
Runtime: nodejs18.x
Tracing: Active
Timeout: 20
Runtime: dotnet8
MemorySize: 1024
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing
POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-kinesis
POWERTOOLS_LOG_LEVEL: Debug
POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase
POWERTOOLS_LOGGER_CASE: PascalCase
POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent
POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1
POWERTOOLS_BATCH_PARALLEL_ENABLED: false
POWERTOOLS_BATCH_PARALLEL_ENABLED : false
POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true

Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./src/HelloWorld/
Handler: HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttribute
Policies:
# Lambda Destinations require additional permissions
# to send failure records to DLQ from Kinesis/DynamoDB
- Version: '2012-10-17'
Statement:
Effect: 'Allow'
Action:
- sqs:GetQueueAttributes
- sqs:GetQueueUrl
- sqs:SendMessage
Resource: !GetAtt SampleDLQ.Arn
- KMSDecryptPolicy:
KeyId: !Ref CustomerKey
Events:
KinesisStream:
Type: Kinesis
Properties:
Stream: !GetAtt SampleStream.Arn
BatchSize: 100
StartingPosition: LATEST
MaximumRetryAttempts: 2
DestinationConfig:
OnFailure:
Destination: !GetAtt SampleDLQ.Arn
FunctionResponseTypes:
- ReportBatchItemFailures

SampleDLQ:
Type: AWS::SQS::Queue
Properties:
KmsMasterKeyId: !Ref CustomerKey

SampleStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
StreamEncryption:
EncryptionType: KMS
KeyId: alias/aws/kinesis

# --------------
# KMS key for encrypted queues
# KMS key for encrypted messages / records
CustomerKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for encrypted queues
Enabled: true
KeyPolicy:
Version: '2012-10-17'
Version: "2012-10-17"
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: Allow use of the key
AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"
Action: "kms:*"
Resource: "*"
- Sid: Allow AWS Lambda to use the key
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: '*'
Resource: "*"

CustomerKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: alias/powertools-batch-sqs-demo
TargetKeyId: !Ref CustomerKey
AliasName: !Sub alias/${AWS::StackName}-kms-key
TargetKeyId: !Ref CustomerKey

# --------------
# Batch Processing for Kinesis Data Stream
KinesisStreamDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
KmsMasterKeyId: !Ref CustomerKey

KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
StreamEncryption:
EncryptionType: KMS
KeyId: !Ref CustomerKey

KinesisStreamConsumer:
Type: AWS::Kinesis::StreamConsumer
Properties:
ConsumerName: powertools-dotnet-sample-batch-kds-consumer
StreamARN: !GetAtt KinesisStream.Arn

KinesisBatchProcessorFunction:
Type: AWS::Serverless::Function
Properties:
Policies:
- Statement:
- Sid: KinesisStreamConsumerPermissions
Effect: Allow
Action:
- kinesis:DescribeStreamConsumer
Resource:
- !GetAtt KinesisStreamConsumer.ConsumerARN
- Sid: DlqPermissions
Effect: Allow
Action:
- sqs:SendMessage
- sqs:SendMessageBatch
Resource: !GetAtt KinesisStreamDeadLetterQueue.Arn
- Sid: KmsKeyPermissions
Effect: Allow
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: !GetAtt CustomerKey.Arn
CodeUri: ./src/HelloWorld/
Handler: HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttribute
Events:
Kinesis:
Type: Kinesis
Properties:
BatchSize: 5
BisectBatchOnFunctionError: true
DestinationConfig:
OnFailure:
Destination: !GetAtt KinesisStreamDeadLetterQueue.Arn
Enabled: true
FunctionResponseTypes:
- ReportBatchItemFailures
MaximumRetryAttempts: 2
ParallelizationFactor: 1
StartingPosition: LATEST
Stream: !GetAtt KinesisStreamConsumer.ConsumerARN

KinesisBatchProcessorFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub "/aws/lambda/${KinesisBatchProcessorFunction}"
RetentionInDays: 7

Outputs:
KinesisStreamArn:
Description: "Kinesis Stream ARN"
Value: !GetAtt KinesisStream.Arn
Loading
Loading