From c4e6b192c3658cbcc3f458a579a0752153e3c201 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Tue, 25 Jul 2023 12:14:51 +0200 Subject: [PATCH] feat(batch): add batch processing utility (#1625) * chore: init workspace * chore: init workspace * feat(batch): Implementation of base batch processing classes (#1588) * chore: init workspace * chore: init workspace * Initial base class implementation * Added BatchProcessor implementation, attempted fix for async * Added unit tests * Refactoring unit tests * Lint fix, updated docstrings * Added response and identifier typings * test(idempotency): improve integration tests for utility (#1591) * docs: new name * chore: rename e2e files * tests(idempotency): expand integration tests * chore(idempotency): remove unreachable code * Removed unnecessary type casting * Moved exports for handlers and factories * Updated imports, refactored randomization in factories * Refactored EventType to be const instead of enum * Refactored and added documentation for errors * Removed debugging line * chore(ci): add canary to layer deployment (#1593) * docs(idempotency): write utility docs (#1592) * docs: base docs * wip * chore: added paths to snippets tsconfig * chore: added page to docs menu * docs(idempotency): utility docs * highlights * chore: remove CDK mention * build(internal): bump semver from 5.7.1 to 5.7.2 (#1594) Bumps [semver](https://github.com/npm/node-semver) from 5.7.1 to 5.7.2. - [Release notes](https://github.com/npm/node-semver/releases) - [Changelog](https://github.com/npm/node-semver/blob/v5.7.2/CHANGELOG.md) - [Commits](https://github.com/npm/node-semver/compare/v5.7.1...v5.7.2) --- updated-dependencies: - dependency-name: semver dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * chore(idempotency): mark the utility ready public beta (#1595) * chore(idempotency): mark utility as public beta * chore: manually increment version in commons * docs(internal): update AWS SDK links to new docs (#1597) * chore(maintenance): remove parameters utility from layer bundling and layers e2e tests (#1599) * remove parameter from e2e tests * remove parameters from canary stack as well * chore(release): v1.11.1 [skip ci] * fix canary deploy in ci with correct workspace name (#1601) * chore: update layer ARN on documentation --------- Signed-off-by: dependabot[bot] Co-authored-by: Andrea Amorosi Co-authored-by: Alexander Schueren Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] Co-authored-by: Release bot[bot] * feat(batch): Batch processing wrapper function (#1605) * Refactored some types, added function wrapper and base test * Added record check and tests, renamed factories * Refactored type check logic in function * Refactor test to remove error ignore * feat(batch): Implement SQS FIFO processor class (#1606) * Added SQS FIFO processor and unit tests * Added docstring for pbatch processing function * feat(batch): Support for Lambda context access in batch processing (#1609) * Added types and parameter for lambda context, added unit tests * Refactor parameter checking * Added test for malformed context handling * docs: created utility docs * docs: fixed white spaces * feat(batch): add async processor (#1616) * feat(batch): add async processor * tests: improved unit tests * chore: removed docstring + edited test handler * chore: fix typos * docs: added README * chore: added package to beta release * chore: marked package as public * chore: added new batch page to docs * chore: added utility to lerna workspace * chore: added utility to main readme * chore: added utility to CI --------- Signed-off-by: dependabot[bot] Co-authored-by: Erika Yao <71943596+erikayao93@users.noreply.github.com> Co-authored-by: Alexander Schueren Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] Co-authored-by: Release bot[bot] --- .github/scripts/release_patch_package_json.js | 5 +- ...sable-run-linting-check-and-unit-tests.yml | 4 +- README.md | 1 + docs/snippets/batch/accessLambdaContext.ts | 37 +++ .../snippets/batch/accessProcessedMessages.ts | 38 +++ docs/snippets/batch/customPartialProcessor.ts | 98 ++++++ docs/snippets/batch/extendingFailure.ts | 53 ++++ docs/snippets/batch/gettingStartedAsync.ts | 31 ++ .../batch/gettingStartedDynamoDBStreams.ts | 35 +++ docs/snippets/batch/gettingStartedKinesis.ts | 30 ++ docs/snippets/batch/gettingStartedSQS.ts | 33 ++ docs/snippets/batch/gettingStartedSQSFifo.ts | 31 ++ .../samples/sampleDynamoDBStreamsEvent.json | 50 +++ .../sampleDynamoDBStreamsResponse.json | 7 + .../batch/samples/sampleKinesisEvent.json | 36 +++ .../batch/samples/sampleKinesisResponse.json | 7 + .../batch/samples/sampleSQSEvent.json | 36 +++ .../batch/samples/sampleSQSResponse.json | 7 + .../batch/templates/sam/dynamodb.yaml | 66 ++++ .../snippets/batch/templates/sam/kinesis.yaml | 56 ++++ docs/snippets/batch/templates/sam/sqs.yaml | 43 +++ docs/snippets/batch/testingYourCode.ts | 32 ++ docs/snippets/package.json | 7 +- docs/snippets/tsconfig.json | 23 +- docs/utilities/batch.md | 269 ++++++++++++++++ lerna.json | 1 + mkdocs.yml | 1 + package-lock.json | 17 +- package.json | 3 +- packages/batch/README.md | 263 ++++++++++++++++ packages/batch/jest.config.js | 28 ++ packages/batch/package.json | 54 ++++ packages/batch/src/AsyncBatchProcessor.ts | 31 ++ .../batch/src/BasePartialBatchProcessor.ts | 157 ++++++++++ packages/batch/src/BasePartialProcessor.ts | 168 ++++++++++ packages/batch/src/BatchProcessor.ts | 31 ++ packages/batch/src/SqsFifoPartialProcessor.ts | 68 ++++ .../batch/src/asyncProcessPartialResponse.ts | 38 +++ packages/batch/src/constants.ts | 25 ++ packages/batch/src/errors.ts | 49 +++ packages/batch/src/index.ts | 10 + packages/batch/src/processPartialResponse.ts | 38 +++ packages/batch/src/types.ts | 37 +++ packages/batch/tests/helpers/factories.ts | 75 +++++ packages/batch/tests/helpers/handlers.ts | 109 +++++++ .../helpers/populateEnvironmentVariables.ts | 12 + .../tests/unit/AsyncBatchProcessor.test.ts | 296 ++++++++++++++++++ .../batch/tests/unit/BatchProcessor.test.ts | 285 +++++++++++++++++ .../unit/SqsFifoPartialProcessor.test.ts | 59 ++++ .../unit/asyncProcessPartialResponse.test.ts | 231 ++++++++++++++ .../tests/unit/processPartialResponse.test.ts | 209 +++++++++++++ packages/batch/tsconfig-dev.json | 11 + packages/batch/tsconfig.es.json | 11 + packages/batch/tsconfig.json | 29 ++ packages/batch/typedoc.json | 9 + 55 files changed, 3367 insertions(+), 23 deletions(-) create mode 100644 docs/snippets/batch/accessLambdaContext.ts create mode 100644 docs/snippets/batch/accessProcessedMessages.ts create mode 100644 docs/snippets/batch/customPartialProcessor.ts create mode 100644 docs/snippets/batch/extendingFailure.ts create mode 100644 docs/snippets/batch/gettingStartedAsync.ts create mode 100644 docs/snippets/batch/gettingStartedDynamoDBStreams.ts create mode 100644 docs/snippets/batch/gettingStartedKinesis.ts create mode 100644 docs/snippets/batch/gettingStartedSQS.ts create mode 100644 docs/snippets/batch/gettingStartedSQSFifo.ts create mode 100644 docs/snippets/batch/samples/sampleDynamoDBStreamsEvent.json create mode 100644 docs/snippets/batch/samples/sampleDynamoDBStreamsResponse.json create mode 100644 docs/snippets/batch/samples/sampleKinesisEvent.json create mode 100644 docs/snippets/batch/samples/sampleKinesisResponse.json create mode 100644 docs/snippets/batch/samples/sampleSQSEvent.json create mode 100644 docs/snippets/batch/samples/sampleSQSResponse.json create mode 100644 docs/snippets/batch/templates/sam/dynamodb.yaml create mode 100644 docs/snippets/batch/templates/sam/kinesis.yaml create mode 100644 docs/snippets/batch/templates/sam/sqs.yaml create mode 100644 docs/snippets/batch/testingYourCode.ts create mode 100644 docs/utilities/batch.md create mode 100644 packages/batch/README.md create mode 100644 packages/batch/jest.config.js create mode 100644 packages/batch/package.json create mode 100644 packages/batch/src/AsyncBatchProcessor.ts create mode 100644 packages/batch/src/BasePartialBatchProcessor.ts create mode 100644 packages/batch/src/BasePartialProcessor.ts create mode 100644 packages/batch/src/BatchProcessor.ts create mode 100644 packages/batch/src/SqsFifoPartialProcessor.ts create mode 100644 packages/batch/src/asyncProcessPartialResponse.ts create mode 100644 packages/batch/src/constants.ts create mode 100644 packages/batch/src/errors.ts create mode 100644 packages/batch/src/index.ts create mode 100644 packages/batch/src/processPartialResponse.ts create mode 100644 packages/batch/src/types.ts create mode 100644 packages/batch/tests/helpers/factories.ts create mode 100644 packages/batch/tests/helpers/handlers.ts create mode 100644 packages/batch/tests/helpers/populateEnvironmentVariables.ts create mode 100644 packages/batch/tests/unit/AsyncBatchProcessor.test.ts create mode 100644 packages/batch/tests/unit/BatchProcessor.test.ts create mode 100644 packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts create mode 100644 packages/batch/tests/unit/asyncProcessPartialResponse.test.ts create mode 100644 packages/batch/tests/unit/processPartialResponse.test.ts create mode 100644 packages/batch/tsconfig-dev.json create mode 100644 packages/batch/tsconfig.es.json create mode 100644 packages/batch/tsconfig.json create mode 100644 packages/batch/typedoc.json diff --git a/.github/scripts/release_patch_package_json.js b/.github/scripts/release_patch_package_json.js index 0108d3aa36..73c1758246 100644 --- a/.github/scripts/release_patch_package_json.js +++ b/.github/scripts/release_patch_package_json.js @@ -18,7 +18,10 @@ if (process.argv.length < 3) { const basePath = resolve(process.argv[2]); const packageJsonPath = join(basePath, 'package.json'); const alphaPackages = []; -const betaPackages = ['@aws-lambda-powertools/idempotency']; +const betaPackages = [ + '@aws-lambda-powertools/idempotency', + '@aws-lambda-powertools/batch', +]; (() => { try { diff --git a/.github/workflows/reusable-run-linting-check-and-unit-tests.yml b/.github/workflows/reusable-run-linting-check-and-unit-tests.yml index 8782e5999e..6c350943b1 100644 --- a/.github/workflows/reusable-run-linting-check-and-unit-tests.yml +++ b/.github/workflows/reusable-run-linting-check-and-unit-tests.yml @@ -27,9 +27,9 @@ jobs: with: nodeVersion: ${{ matrix.version }} - name: Run linting - run: npm run lint -w packages/commons -w packages/logger -w packages/tracer -w packages/metrics -w packages/parameters -w packages/idempotency + run: npm run lint -w packages/commons -w packages/logger -w packages/tracer -w packages/metrics -w packages/parameters -w packages/idempotency -w packages/batch - name: Run unit tests - run: npm t -w packages/commons -w packages/logger -w packages/tracer -w packages/metrics -w packages/parameters -w packages/idempotency + run: npm t -w packages/commons -w packages/logger -w packages/tracer -w packages/metrics -w packages/parameters -w packages/idempotency -w packages/batch check-examples: runs-on: ubuntu-latest env: diff --git a/README.md b/README.md index 03746c86a5..9b7168ae19 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ You can use the library in both TypeScript and JavaScript code bases. * **[Metrics](https://docs.powertools.aws.dev/lambda-typescript/latest/core/metrics/)** - Custom Metrics created asynchronously via CloudWatch Embedded Metric Format (EMF) * **[Parameters](https://docs.powertools.aws.dev/lambda-typescript/latest/utilities/parameters/)** - High-level functions to retrieve one or more parameters from AWS SSM Parameter Store, AWS Secrets Manager, AWS AppConfig, and Amazon DynamoDB * **[Idempotency (beta)](https://docs.powertools.aws.dev/lambda-typescript/latest/utilities/idempotency/)** - Class method decorator, Middy middleware, and function wrapper to make your Lambda functions idempotent and prevent duplicate execution based on payload content +* **[Batch Processing (beta)](https://docs.powertools.aws.dev/lambda-typescript/latest/utilities/batch/)** - Utility to handle partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. ## Getting started diff --git a/docs/snippets/batch/accessLambdaContext.ts b/docs/snippets/batch/accessLambdaContext.ts new file mode 100644 index 0000000000..0f8aa89409 --- /dev/null +++ b/docs/snippets/batch/accessLambdaContext.ts @@ -0,0 +1,37 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.SQS); +const logger = new Logger(); + +const recordHandler = (record: SQSRecord, lambdaContext?: Context): void => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } + if (lambdaContext) { + logger.info('Remaining time', { + time: lambdaContext.getRemainingTimeInMillis(), + }); + } +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; diff --git a/docs/snippets/batch/accessProcessedMessages.ts b/docs/snippets/batch/accessProcessedMessages.ts new file mode 100644 index 0000000000..6dbd338c7b --- /dev/null +++ b/docs/snippets/batch/accessProcessedMessages.ts @@ -0,0 +1,38 @@ +import { BatchProcessor, EventType } from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.SQS); +const logger = new Logger(); + +const recordHandler = (record: SQSRecord): void => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + const batch = event.Records; + + processor.register(batch, recordHandler, { context }); + const processedMessages = processor.process(); + + for (const message of processedMessages) { + const status: 'success' | 'fail' = message[0]; + const record = message[2]; + + logger.info('Processed record', { status, record }); + } + + return processor.response(); +}; diff --git a/docs/snippets/batch/customPartialProcessor.ts b/docs/snippets/batch/customPartialProcessor.ts new file mode 100644 index 0000000000..d44b99a602 --- /dev/null +++ b/docs/snippets/batch/customPartialProcessor.ts @@ -0,0 +1,98 @@ +import { randomInt } from 'node:crypto'; +import { + DynamoDBClient, + BatchWriteItemCommand, +} from '@aws-sdk/client-dynamodb'; +import { marshall } from '@aws-sdk/util-dynamodb'; +import { + BasePartialProcessor, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import type { + SuccessResponse, + FailureResponse, + EventSourceType, +} from '@aws-lambda-powertools/batch'; +import type { SQSEvent, Context, SQSBatchResponse } from 'aws-lambda'; + +const tableName = process.env.TABLE_NAME || 'table-not-found'; + +class MyPartialProcessor extends BasePartialProcessor { + #tableName: string; + #client?: DynamoDBClient; + + public constructor(tableName: string) { + super(); + this.#tableName = tableName; + } + + public async asyncProcessRecord( + _record: EventSourceType + ): Promise { + throw new Error('Not implemented'); + } + + /** + * It's called once, **after** processing the batch. + * + * Here we are writing all the processed messages to DynamoDB. + */ + public clean(): void { + // We know that the client is defined because clean() is called after prepare() + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.#client!.send( + new BatchWriteItemCommand({ + RequestItems: { + [this.#tableName]: this.successMessages.map((message) => ({ + PutRequest: { + Item: marshall(message), + }, + })), + }, + }) + ); + } + + /** + * It's called once, **before** processing the batch. + * + * It initializes a new client and cleans up any existing data. + */ + public prepare(): void { + this.#client = new DynamoDBClient({}); + this.successMessages = []; + } + + /** + * It handles how your record is processed. + * + * Here we are keeping the status of each run, `this.handler` is + * the function that is passed when calling `processor.register()`. + */ + public processRecord( + record: EventSourceType + ): SuccessResponse | FailureResponse { + try { + const result = this.handler(record); + + return this.successHandler(record, result); + } catch (error) { + return this.failureHandler(record, error as Error); + } + } +} + +const processor = new MyPartialProcessor(tableName); + +const recordHandler = (): number => { + return Math.floor(randomInt(1, 10)); +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; diff --git a/docs/snippets/batch/extendingFailure.ts b/docs/snippets/batch/extendingFailure.ts new file mode 100644 index 0000000000..ab1ef530f9 --- /dev/null +++ b/docs/snippets/batch/extendingFailure.ts @@ -0,0 +1,53 @@ +import { Metrics, MetricUnits } from '@aws-lambda-powertools/metrics'; +import { + BatchProcessor, + EventType, + FailureResponse, + EventSourceType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +class MyProcessor extends BatchProcessor { + #metrics: Metrics; + + public constructor(eventType: keyof typeof EventType) { + super(eventType); + this.#metrics = new Metrics({ namespace: 'test' }); + } + + public failureHandler( + record: EventSourceType, + error: Error + ): FailureResponse { + this.#metrics.addMetric('BatchRecordFailures', MetricUnits.Count, 1); + + return super.failureHandler(record, error); + } +} + +const processor = new MyProcessor(EventType.SQS); +const logger = new Logger(); + +const recordHandler = (record: SQSRecord): void => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; diff --git a/docs/snippets/batch/gettingStartedAsync.ts b/docs/snippets/batch/gettingStartedAsync.ts new file mode 100644 index 0000000000..0080752026 --- /dev/null +++ b/docs/snippets/batch/gettingStartedAsync.ts @@ -0,0 +1,31 @@ +import { + AsyncBatchProcessor, + EventType, + asyncProcessPartialResponse, +} from '@aws-lambda-powertools/batch'; +import axios from 'axios'; // axios is an external dependency +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new AsyncBatchProcessor(EventType.SQS); + +const recordHandler = async (record: SQSRecord): Promise => { + const res = await axios.post('https://httpbin.org/anything', { + message: record.body, + }); + + return res.status; +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return await asyncProcessPartialResponse(event, recordHandler, processor, { + context, + }); +}; diff --git a/docs/snippets/batch/gettingStartedDynamoDBStreams.ts b/docs/snippets/batch/gettingStartedDynamoDBStreams.ts new file mode 100644 index 0000000000..4d1842bcec --- /dev/null +++ b/docs/snippets/batch/gettingStartedDynamoDBStreams.ts @@ -0,0 +1,35 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + DynamoDBStreamEvent, + DynamoDBRecord, + Context, + DynamoDBBatchResponse, +} from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.DynamoDBStreams); +const logger = new Logger(); + +const recordHandler = (record: DynamoDBRecord): void => { + if (record.dynamodb && record.dynamodb.NewImage) { + logger.info('Processing record', { record: record.dynamodb.NewImage }); + const message = record.dynamodb.NewImage.Message.S; + if (message) { + const payload = JSON.parse(message); + logger.info('Processed item', { item: payload }); + } + } +}; + +export const handler = async ( + event: DynamoDBStreamEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; diff --git a/docs/snippets/batch/gettingStartedKinesis.ts b/docs/snippets/batch/gettingStartedKinesis.ts new file mode 100644 index 0000000000..eb1c8a8810 --- /dev/null +++ b/docs/snippets/batch/gettingStartedKinesis.ts @@ -0,0 +1,30 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + KinesisStreamEvent, + KinesisStreamRecord, + Context, + KinesisStreamBatchResponse, +} from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.KinesisDataStreams); +const logger = new Logger(); + +const recordHandler = (record: KinesisStreamRecord): void => { + logger.info('Processing record', { record: record.kinesis.data }); + const payload = JSON.parse(record.kinesis.data); + logger.info('Processed item', { item: payload }); +}; + +export const handler = async ( + event: KinesisStreamEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; diff --git a/docs/snippets/batch/gettingStartedSQS.ts b/docs/snippets/batch/gettingStartedSQS.ts new file mode 100644 index 0000000000..3ee6a3fa56 --- /dev/null +++ b/docs/snippets/batch/gettingStartedSQS.ts @@ -0,0 +1,33 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.SQS); +const logger = new Logger(); + +const recordHandler = (record: SQSRecord): void => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; +export { processor }; diff --git a/docs/snippets/batch/gettingStartedSQSFifo.ts b/docs/snippets/batch/gettingStartedSQSFifo.ts new file mode 100644 index 0000000000..34ff76e705 --- /dev/null +++ b/docs/snippets/batch/gettingStartedSQSFifo.ts @@ -0,0 +1,31 @@ +import { + SqsFifoPartialProcessor, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new SqsFifoPartialProcessor(); +const logger = new Logger(); + +const recordHandler = (record: SQSRecord): void => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; diff --git a/docs/snippets/batch/samples/sampleDynamoDBStreamsEvent.json b/docs/snippets/batch/samples/sampleDynamoDBStreamsEvent.json new file mode 100644 index 0000000000..f74c2429a5 --- /dev/null +++ b/docs/snippets/batch/samples/sampleDynamoDBStreamsEvent.json @@ -0,0 +1,50 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "failure" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "3275880929", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "SomethingElse": { + "S": "success" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "8640712661", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + } + ] +} diff --git a/docs/snippets/batch/samples/sampleDynamoDBStreamsResponse.json b/docs/snippets/batch/samples/sampleDynamoDBStreamsResponse.json new file mode 100644 index 0000000000..9ccbde9ba9 --- /dev/null +++ b/docs/snippets/batch/samples/sampleDynamoDBStreamsResponse.json @@ -0,0 +1,7 @@ +{ + "batchItemFailures": [ + { + "itemIdentifier": "8640712661" + } + ] +} diff --git a/docs/snippets/batch/samples/sampleKinesisEvent.json b/docs/snippets/batch/samples/sampleKinesisEvent.json new file mode 100644 index 0000000000..2721ad7d9a --- /dev/null +++ b/docs/snippets/batch/samples/sampleKinesisEvent.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "4107859083838847772757075850904226111829882106684065", + "data": "eyJNZXNzYWdlIjogInN1Y2Nlc3MifQ==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:4107859083838847772757075850904226111829882106684065", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "6006958808509702859251049540584488075644979031228738", + "data": "c3VjY2Vzcw==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:6006958808509702859251049540584488075644979031228738", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] +} diff --git a/docs/snippets/batch/samples/sampleKinesisResponse.json b/docs/snippets/batch/samples/sampleKinesisResponse.json new file mode 100644 index 0000000000..7ebd013d7f --- /dev/null +++ b/docs/snippets/batch/samples/sampleKinesisResponse.json @@ -0,0 +1,7 @@ +{ + "batchItemFailures": [ + { + "itemIdentifier": "6006958808509702859251049540584488075644979031228738" + } + ] +} diff --git a/docs/snippets/batch/samples/sampleSQSEvent.json b/docs/snippets/batch/samples/sampleSQSEvent.json new file mode 100644 index 0000000000..50a411be86 --- /dev/null +++ b/docs/snippets/batch/samples/sampleSQSEvent.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "{\"Message\": \"success\"}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + }, + { + "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + } + ] +} diff --git a/docs/snippets/batch/samples/sampleSQSResponse.json b/docs/snippets/batch/samples/sampleSQSResponse.json new file mode 100644 index 0000000000..9802316a68 --- /dev/null +++ b/docs/snippets/batch/samples/sampleSQSResponse.json @@ -0,0 +1,7 @@ +{ + "batchItemFailures": [ + { + "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a" + } + ] +} diff --git a/docs/snippets/batch/templates/sam/dynamodb.yaml b/docs/snippets/batch/templates/sam/dynamodb.yaml new file mode 100644 index 0000000000..c95dea07b7 --- /dev/null +++ b/docs/snippets/batch/templates/sam/dynamodb.yaml @@ -0,0 +1,66 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: partial batch response sample + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: nodejs18.x + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: index.handler + CodeUri: hello_world + Policies: + # Lambda Destinations require additional permissions + # to send failure records from Kinesis/DynamoDB + - Version: '2012-10-17' + Statement: + Effect: 'Allow' + Action: + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:SendMessage + Resource: !GetAtt SampleDLQ.Arn + Events: + DynamoDBStream: + Type: DynamoDB + Properties: + Stream: !GetAtt SampleTable.StreamArn + StartingPosition: LATEST + MaximumRetryAttempts: 2 + DestinationConfig: + OnFailure: + Destination: !GetAtt SampleDLQ.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleTable: + Type: AWS::DynamoDB::Table + Properties: + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: pk + AttributeType: S + - AttributeName: sk + AttributeType: S + KeySchema: + - AttributeName: pk + KeyType: HASH + - AttributeName: sk + KeyType: RANGE + SSESpecification: + SSEEnabled: true + StreamSpecification: + StreamViewType: NEW_AND_OLD_IMAGES diff --git a/docs/snippets/batch/templates/sam/kinesis.yaml b/docs/snippets/batch/templates/sam/kinesis.yaml new file mode 100644 index 0000000000..032b354a74 --- /dev/null +++ b/docs/snippets/batch/templates/sam/kinesis.yaml @@ -0,0 +1,56 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: partial batch response sample + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: nodejs18.x + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: index.handler + CodeUri: hello_world + Policies: + # Lambda Destinations require additional permissions + # to send failure records to DLQ from Kinesis/DynamoDB + - Version: '2012-10-17' + Statement: + Effect: 'Allow' + Action: + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:SendMessage + Resource: !GetAtt SampleDLQ.Arn + Events: + KinesisStream: + Type: Kinesis + Properties: + Stream: !GetAtt SampleStream.Arn + BatchSize: 100 + StartingPosition: LATEST + MaximumRetryAttempts: 2 + DestinationConfig: + OnFailure: + Destination: !GetAtt SampleDLQ.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleStream: + Type: AWS::Kinesis::Stream + Properties: + ShardCount: 1 + StreamEncryption: + EncryptionType: KMS + KeyId: alias/aws/kinesis diff --git a/docs/snippets/batch/templates/sam/sqs.yaml b/docs/snippets/batch/templates/sam/sqs.yaml new file mode 100644 index 0000000000..65b91507eb --- /dev/null +++ b/docs/snippets/batch/templates/sam/sqs.yaml @@ -0,0 +1,43 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: partial batch response sample + +Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: nodejs18.x + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: index.handler + CodeUri: hello_world + Policies: + - SQSPollerPolicy: + QueueName: !GetAtt SampleQueue.QueueName + Events: + Batch: + Type: SQS + Properties: + Queue: !GetAtt SampleQueue.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleQueue: + Type: AWS::SQS::Queue + Properties: + VisibilityTimeout: 30 # Fn timeout * 6 + SqsManagedSseEnabled: true + RedrivePolicy: + maxReceiveCount: 2 + deadLetterTargetArn: !GetAtt SampleDLQ.Arn diff --git a/docs/snippets/batch/testingYourCode.ts b/docs/snippets/batch/testingYourCode.ts new file mode 100644 index 0000000000..a7ffa0e5be --- /dev/null +++ b/docs/snippets/batch/testingYourCode.ts @@ -0,0 +1,32 @@ +import { ContextExamples as dummyContext } from '@aws-lambda-powertools/commons'; +import { handler, processor } from './gettingStartedSQS'; +import sqsEvent from './samples/sampleSQSEvent.json'; + +describe('Function tests', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + test('should return one failed message', async () => { + // Prepare + const context = dummyContext.helloworldContext; + const processorResult = processor; // access processor for additional assertions + const successfulRecord = sqsEvent.Records[0]; + const failedRecord = sqsEvent.Records[1]; + const expectedResponse = { + batchItemFailures: [ + { + itemIdentifier: failedRecord.messageId, + }, + ], + }; + + // Act + const response = await handler(sqsEvent, context); + + // Assess + expect(response).toEqual(expectedResponse); + expect(processorResult.failureMessages).toHaveLength(1); + expect(processorResult.successMessages[0]).toEqual(successfulRecord); + }); +}); diff --git a/docs/snippets/package.json b/docs/snippets/package.json index 5e067b21e3..0258196766 100644 --- a/docs/snippets/package.json +++ b/docs/snippets/package.json @@ -14,8 +14,9 @@ "lint-fix": "eslint --fix --ext .ts,.js --no-error-on-unmatched-pattern ." }, "lint-staged": { - "*.ts": "npm run lint-fix", - "*.js": "npm run lint-fix" + "*.{js,ts}": "npm run lint-fix", + "*.json": "prettier --write \"**/samples/*.json\" --single-quote false", + "*.yaml": "prettier --write \"**/templates/sam/*.yaml\"" }, "license": "MIT-0", "repository": { @@ -38,4 +39,4 @@ "axios": "^1.2.4", "hashi-vault-js": "^0.4.13" } -} \ No newline at end of file +} diff --git a/docs/snippets/tsconfig.json b/docs/snippets/tsconfig.json index 59dfbd1435..2d76b059c1 100644 --- a/docs/snippets/tsconfig.json +++ b/docs/snippets/tsconfig.json @@ -35,26 +35,19 @@ "@aws-lambda-powertools/idempotency/persistence": [ "../../packages/idempotency/lib/persistence" ], - "@aws-lambda-powertools/idempotency": [ - "../../packages/idempotency/lib" - ], + "@aws-lambda-powertools/idempotency": ["../../packages/idempotency/lib"], "@aws-lambda-powertools/idempotency/middleware": [ "../../packages/idempotency/lib/middleware" - ] - }, + ], + "@aws-lambda-powertools/batch": ["../../packages/batch/lib"] + } }, - "exclude": [ - "./node_modules" - ], + "exclude": ["./node_modules"], "watchOptions": { "watchFile": "useFsEvents", "watchDirectory": "useFsEvents", "fallbackPolling": "dynamicPriority" }, - "lib": [ - "ES2020" - ], - "types": [ - "node" - ] -} \ No newline at end of file + "lib": ["ES2020"], + "types": ["node"] +} diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md new file mode 100644 index 0000000000..6c8c96d004 --- /dev/null +++ b/docs/utilities/batch.md @@ -0,0 +1,269 @@ +--- +title: Batch Processing +description: Utility +--- + + + +???+ warning + **This page refers to an unreleased utility that has yet to be published on the npm registry. Any version of the package built from source, as well as all future versions tagged with the `-alpha` suffix should be treated as experimental. Follow the [Beta release](https://github.com/aws-powertools/powertools-lambda-typescript/milestone/13) milestone for updates on the progress of this utility.** + +The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. + +## Key features + +* Reports batch item failures to reduce number of retries for a record upon errors +* Simple interface to process each batch record +* Build your own batch processor by extending primitives + +## Background + +When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages. + +If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire. + +With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. This works when two mechanisms are in place: + +1. `ReportBatchItemFailures` is set in your SQS, Kinesis, or DynamoDB event source properties +2. [A specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"} is returned so Lambda knows which records should not be deleted during partial responses + + + +???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it" + We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible. + + You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation. + +## Getting started + +Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use `ReportBatchItemFailures`. + +You do not need any additional IAM permissions to use this utility, except for what each event source requires. + +### Required resources + +The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted. + +=== "SQS" + + ```yaml title="template.yaml" hl_lines="30-31" + --8<-- "docs/snippets/batch/templates/sam/sqs.yaml" + ``` + +=== "Kinesis Data Streams" + + ```yaml title="template.yaml" hl_lines="44-45" + --8<-- "docs/snippets/batch/templates/sam/kinesis.yaml" + ``` + +=== "DynamoDB Streams" + + ```yaml title="template.yaml" hl_lines="43-44" + --8<-- "docs/snippets/batch/templates/sam/dynamodb.yaml" + ``` + +### Processing messages from SQS + +Processing batches from SQS works in three stages: + +1. Instantiate **`BatchProcessor`** and choose **`EventType.SQS`** for the event type +2. Define your function to handle each batch record, and use the `SQSRecord` type annotation for autocompletion +3. Use **`processPartialResponse`** to kick off processing + +???+ info + This code example optionally uses Logger for completion. + +=== "index.ts" + + ```typescript hl_lines="1-5 14 17 29-31" + --8<-- "docs/snippets/batch/gettingStartedSQS.ts::32" + ``` + +=== "Sample response" + + The second record failed to be processed, therefore the processor added its message ID in the response. + + ```json + --8<-- "docs/snippets/batch/samples/sampleSQSResponse.json" + ``` + +=== "Sample event" + + ```json + --8<-- "docs/snippets/batch/samples/sampleSQSEvent.json" + ``` + +#### FIFO queues + +When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. +This helps preserve the ordering of messages in your queue. + +```typescript hl_lines="1-4 13 28-30" +--8<-- "docs/snippets/batch/gettingStartedSQSFifo.ts" +``` + +### Processing messages from Kinesis + +Processing batches from Kinesis works in three stages: + +1. Instantiate **`BatchProcessor`** and choose **`EventType.KinesisDataStreams`** for the event type +2. Define your function to handle each batch record, and use the `KinesisStreamRecord` type annotation for autocompletion +3. Use **`processPartialResponse`** to kick off processing + +???+ info + This code example optionally uses Logger for completion. + +=== "index.ts" + + ```typescript hl_lines="1-5 14 17 27-29" + --8<-- "docs/snippets/batch/gettingStartedKinesis.ts" + ``` + +=== "Sample response" + + The second record failed to be processed, therefore the processor added its sequence number in the response. + + ```json + --8<-- "docs/snippets/batch/samples/sampleKinesisEvent.json" + ``` + +=== "Sample event" + + ```json + --8<-- "docs/snippets/batch/samples/sampleKinesisResponse.json" + ``` + +### Processing messages from DynamoDB + +Processing batches from DynamoDB Streams works in three stages: + +1. Instantiate **`BatchProcessor`** and choose **`EventType.DynamoDBStreams`** for the event type +2. Define your function to handle each batch record, and use the `DynamoDBRecord` type annotation for autocompletion +3. Use **`processPartialResponse`** to kick off processing + +???+ info + This code example optionally uses Logger for completion. + +=== "index.ts" + + ```typescript hl_lines="1-5 14 17 32-34" + --8<-- "docs/snippets/batch/gettingStartedDynamoDBStreams.ts" + ``` + +=== "Sample response" + + The second record failed to be processed, therefore the processor added its sequence number in the response. + + ```json + --8<-- "docs/snippets/batch/samples/sampleDynamoDBStreamsResponse.json" + ``` + +=== "Sample event" + + ```json + --8<-- "docs/snippets/batch/samples/sampleDynamoDBStreamsEvent.json" + ``` + +### Partial failure mechanics + +All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: + +* **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}` +* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing +* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing + +### Processing messages asynchronously + +You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently. + +???+ question "When is this useful?" + Your use case might be able to process multiple records at the same time without conflicting with one another. + + For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently. + + The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order). + +```typescript hl_lines="1-5 14 28-30" title="High-concurrency with AsyncBatchProcessor" +--8<-- "docs/snippets/batch/gettingStartedAsync.ts" +``` + +## Advanced + +### Accessing processed messages + +Use the `BatchProcessor` directly in your function to access a list of all returned values from your `recordHandler` function. + +* **When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record +* **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record + +```typescript hl_lines="27-28 30-32 37" title="Accessing processed messages" +--8<-- "docs/snippets/batch/accessProcessedMessages.ts" +``` + +### Accessing Lambda Context + +Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out. + +We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessor` or the `processPartialResponse` function. + +```typescript hl_lines="17 35" +--8<-- "docs/snippets/batch/accessLambdaContext.ts" +``` + +### Extending BatchProcessor + +You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures. + +For these scenarios, you can subclass `BatchProcessor` and quickly override `successHandler` and `failureHandler` methods: + +* **`successHandler()`** – Keeps track of successful batch records +* **`failureHandler()`** – Keeps track of failed batch records + +???+ example + Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing + +```typescript hl_lines="5-6 17-33 35 50-52" title="Extending failure handling mechanism in BatchProcessor" +--8<-- "docs/snippets/batch/extendingFailure.ts" +``` + +### Create your own partial processor + +You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `asyncProcessRecord()` abstract methods. + +* **`processRecord()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`) +* **`prepare()`** – called once as part of the processor initialization +* **`clean()`** – teardown logic called once after `processRecord` completes +* **`asyncProcessRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic + +You can then use this class as a context manager, or pass it to `processPartialResponse` to process the records in your Lambda handler function. + +```typescript hl_lines="7 11-13 19 28 39 60 71 82 92-94" title="Creating a custom batch processor" +--8<-- "docs/snippets/batch/customPartialProcessor.ts" +``` + +## Testing your code + +As there is no external calls, you can unit test your code with `BatchProcessor` quite easily. + +**Example**: + +Given a SQS batch where the first batch record succeeds and the second fails processing, we should have a single item reported in the function response. + +=== "index.test.ts" + + ```typescript + --8<-- "docs/snippets/batch/testingYourCode.ts" + ``` + +=== "index.ts" + + ```typescript + --8<-- "docs/snippets/batch/gettingStartedSQS.ts" + ``` + +=== "Sample SQS event" + + ```json title="events/sqs_event.json" + --8<-- "docs/snippets/batch/samples/sampleSQSEvent.json" + ``` \ No newline at end of file diff --git a/lerna.json b/lerna.json index 27a6315be8..ee57bfb2a1 100644 --- a/lerna.json +++ b/lerna.json @@ -6,6 +6,7 @@ "packages/metrics", "packages/parameters", "packages/idempotency", + "packages/batch", "examples/cdk", "examples/sam", "layers" diff --git a/mkdocs.yml b/mkdocs.yml index f5b7408590..f5f85643d4 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -17,6 +17,7 @@ nav: - Utilities: - utilities/parameters.md - utilities/idempotency.md + - utilities/batch.md theme: name: material diff --git a/package-lock.json b/package-lock.json index b0050f1e8e..9a430ff6eb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "packages/tracer", "packages/parameters", "packages/idempotency", + "packages/batch", "docs/snippets", "layers", "examples/cdk", @@ -638,6 +639,10 @@ "resolved": "https://registry.npmjs.org/tslib/-/tslib-1.14.1.tgz", "integrity": "sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==" }, + "node_modules/@aws-lambda-powertools/batch": { + "resolved": "packages/batch", + "link": true + }, "node_modules/@aws-lambda-powertools/commons": { "resolved": "packages/commons", "link": true @@ -6590,6 +6595,10 @@ ], "license": "MIT" }, + "node_modules/batch": { + "resolved": "packages/batch", + "link": true + }, "node_modules/before-after-hook": { "version": "2.2.3", "resolved": "https://registry.npmjs.org/before-after-hook/-/before-after-hook-2.2.3.tgz", @@ -17984,6 +17993,12 @@ "node": ">= 10" } }, + "packages/batch": { + "name": "@aws-lambda-powertools/batch", + "version": "1.10.0", + "license": "MIT-0", + "devDependencies": {} + }, "packages/commons": { "name": "@aws-lambda-powertools/commons", "version": "1.11.1", @@ -18482,4 +18497,4 @@ } } } -} +} \ No newline at end of file diff --git a/package.json b/package.json index 467365c53e..73bd6845da 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,8 @@ "docs/snippets", "layers", "examples/cdk", - "examples/sam" + "examples/sam", + "packages/batch" ], "scripts": { "init-environment": "husky install", diff --git a/packages/batch/README.md b/packages/batch/README.md new file mode 100644 index 0000000000..caf8bae3ea --- /dev/null +++ b/packages/batch/README.md @@ -0,0 +1,263 @@ +# Powertools for AWS Lambda (TypeScript) - Batch Processing Utility + + +| ⚠️ **WARNING: Do not use this utility in production just yet!** ⚠️ | +| :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| **This utility is currently released as beta developer preview** and is intended strictly for feedback and testing purposes **and not for production workloads**.. The version and all future versions tagged with the `-beta` suffix should be treated as not stable. Up until before the [General Availability release](https://github.com/aws-powertools/powertools-lambda-typescript/milestone/14) we might introduce significant breaking changes and improvements in response to customers feedback. | _ | + + +Powertools for AWS Lambda (TypeScript) is a developer toolkit to implement Serverless [best practices and increase developer velocity](https://docs.powertools.aws.dev/lambda-typescript/latest/#features). + +You can use the package in both TypeScript and JavaScript code bases. + +- [Intro](#intro) +- [Key features](#key-features) +- [Usage](#usage) + - [Batch Processor](#batch-processor) + - [SQS Processor](#sqs-processor) + - [Kinesis Processor](#kinesis-processor) + - [DynamoDB Streams Processor](#dynamodb-streams-processor) + - [Async processing](#async-processing) +- [Contribute](#contribute) +- [Roadmap](#roadmap) +- [Connect](#connect) +- [How to support Powertools for AWS Lambda (TypeScript)?](#how-to-support-powertools-for-aws-lambda-typescript) + - [Becoming a reference customer](#becoming-a-reference-customer) + - [Sharing your work](#sharing-your-work) + - [Using Lambda Layer](#using-lambda-layer) +- [Credits](#credits) +- [License](#license) + +## Intro + +The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. + +## Key features + +* Reports batch item failures to reduce number of retries for a record upon errors +* Simple interface to process each batch record +* Build your own batch processor by extending primitives + +## Usage + +To get started, install the library by running: + +```sh +npm install @aws-lambda-powertools/batch +``` + +### Batch Processor + +When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages. + +If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire. + +With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. + +### SQS Processor + +When using SQS as a Lambda event source, you can specify the `EventType.SQS` to process the records. The response will be a `SQSBatchResponse` which contains a list of items that failed to be processed. + +```ts +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.SQS); +const logger = new Logger(); + +const recordHandler = (record: SQSRecord): void => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; +export { processor }; +``` + +### Kinesis Processor + +When using Kinesis Data Streams as a Lambda event source, you can specify the `EventType.KinesisDataStreams` to process the records. The response will be a `KinesisStreamBatchResponse` which contains a list of items that failed to be processed. + +```ts +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + KinesisStreamEvent, + KinesisStreamRecord, + Context, + KinesisStreamBatchResponse, +} from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.KinesisDataStreams); +const logger = new Logger(); + +const recordHandler = (record: KinesisStreamRecord): void => { + logger.info('Processing record', { record: record.kinesis.data }); + const payload = JSON.parse(record.kinesis.data); + logger.info('Processed item', { item: payload }); +}; + +export const handler = async ( + event: KinesisStreamEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; +``` + +### DynamoDB Streams Processor + +When using DynamoDB Streams as a Lambda event source, you can use the `BatchProcessor` with the `EventType.DynamoDBStreams` to process the records. The response will be a `DynamoDBBatchResponse` which contains a list of items that failed to be processed. + +```ts +import { + BatchProcessor, + EventType, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + DynamoDBStreamEvent, + DynamoDBRecord, + Context, + DynamoDBBatchResponse, +} from 'aws-lambda'; + +const processor = new BatchProcessor(EventType.DynamoDBStreams); +const logger = new Logger(); + +const recordHandler = (record: DynamoDBRecord): void => { + if (record.dynamodb && record.dynamodb.NewImage) { + logger.info('Processing record', { record: record.dynamodb.NewImage }); + const message = record.dynamodb.NewImage.Message.S; + if (message) { + const payload = JSON.parse(message); + logger.info('Processed item', { item: payload }); + } + } +}; + +export const handler = async ( + event: DynamoDBStreamEvent, + context: Context +): Promise => { + return processPartialResponse(event, recordHandler, processor, { + context, + }); +}; +``` + +### Async processing + +If your use case allows you to process multiple records at the same time without conflicting with each other, you can use the `AsyncBatchProcessor` to process records asynchronously. This will create an array of promises that will be resolved once all records have been processed. + +```ts +import { + AsyncBatchProcessor, + EventType, + asyncProcessPartialResponse, +} from '@aws-lambda-powertools/batch'; +import axios from 'axios'; // axios is an external dependency +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new AsyncBatchProcessor(EventType.SQS); + +const recordHandler = async (record: SQSRecord): Promise => { + const res = await axios.post('https://httpbin.org/anything', { + message: record.body, + }); + + return res.status; +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return await asyncProcessPartialResponse(event, recordHandler, processor, { + context, + }); +}; +``` + +Check the [docs](https://docs.powertools.aws.dev/lambda/typescript/latest/utilities/batch/) for more examples. + + + +## Contribute + +If you are interested in contributing to this project, please refer to our [Contributing Guidelines](https://github.com/aws-powertools/powertools-lambda-typescript/blob/main/CONTRIBUTING.md). + +## Roadmap + +The roadmap of Powertools for AWS Lambda (TypeScript) is driven by customers’ demand. +Help us prioritize upcoming functionalities or utilities by [upvoting existing RFCs and feature requests](https://github.com/aws-powertools/powertools-lambda-typescript/issues), or [creating new ones](https://github.com/aws-powertools/powertools-lambda-typescript/issues/new/choose), in this GitHub repository. + +## Connect + +* **Powertools for AWS Lambda on Discord**: `#typescript` - **[Invite link](https://discord.gg/B8zZKbbyET)** +* **Email**: aws-lambda-powertools-feedback@amazon.com + +## How to support Powertools for AWS Lambda (TypeScript)? + +### Becoming a reference customer + +Knowing which companies are using this library is important to help prioritize the project internally. If your company is using Powertools for AWS Lambda (TypeScript), you can request to have your name and logo added to the README file by raising a [Support Powertools for AWS Lambda (TypeScript) (become a reference)](https://github.com/aws-powertools/powertools-lambda-typescript/issues/new?assignees=&labels=customer-reference&template=support_powertools.yml&title=%5BSupport+Lambda+Powertools%5D%3A+%3Cyour+organization+name%3E) issue. + +The following companies, among others, use Powertools: + +* [Hashnode](https://hashnode.com/) +* [Trek10](https://www.trek10.com/) +* [Elva](https://elva-group.com) +* [globaldatanet](https://globaldatanet.com/) +* [Bailey Nelson](https://www.baileynelson.com.au) +* [Perfect Post](https://www.perfectpost.fr) +* [Sennder](https://sennder.com/) +* [Certible](https://www.certible.com/) + +### Sharing your work + +Share what you did with Powertools for AWS Lambda (TypeScript) 💞💞. Blog post, workshops, presentation, sample apps and others. Check out what the community has already shared about Powertools for AWS Lambda (TypeScript) [here](https://docs.powertools.aws.dev/lambda-typescript/latest/we_made_this). + +### Using Lambda Layer + +This helps us understand who uses Powertools for AWS Lambda (TypeScript) in a non-intrusive way, and helps us gain future investments for other Powertools for AWS Lambda languages. When [using Layers](#lambda-layers), you can add Powertools as a dev dependency (or as part of your virtual env) to not impact the development process. + +## Credits + +Credits for the Lambda Powertools for AWS Lambda (TypeScript) idea go to [DAZN](https://github.com/getndazn) and their [DAZN Lambda Powertools](https://github.com/getndazn/dazn-lambda-powertools/). + +## License + +This library is licensed under the MIT-0 License. See the LICENSE file. diff --git a/packages/batch/jest.config.js b/packages/batch/jest.config.js new file mode 100644 index 0000000000..3db7c7a6da --- /dev/null +++ b/packages/batch/jest.config.js @@ -0,0 +1,28 @@ +module.exports = { + displayName: { + name: 'Powertools for AWS Lambda (TypeScript) utility: BATCH', + color: 'orange', + }, + runner: 'groups', + preset: 'ts-jest', + transform: { + '^.+\\.ts?$': 'ts-jest', + }, + moduleFileExtensions: ['js', 'ts'], + collectCoverageFrom: ['**/src/**/*.ts', '!**/node_modules/**'], + testMatch: ['**/?(*.)+(spec|test).ts'], + roots: ['/src', '/tests'], + testPathIgnorePatterns: ['/node_modules/'], + testEnvironment: 'node', + coveragePathIgnorePatterns: ['/node_modules/', '/types/'], + coverageThreshold: { + global: { + statements: 100, + branches: 100, + functions: 100, + lines: 100, + }, + }, + coverageReporters: ['json-summary', 'text', 'lcov'], + setupFiles: ['/tests/helpers/populateEnvironmentVariables.ts'], +}; diff --git a/packages/batch/package.json b/packages/batch/package.json new file mode 100644 index 0000000000..f87fb225ac --- /dev/null +++ b/packages/batch/package.json @@ -0,0 +1,54 @@ +{ + "name": "@aws-lambda-powertools/batch", + "version": "1.11.1", + "description": "The batch processing package for the Powertools for AWS Lambda (TypeScript) library.", + "author": { + "name": "Amazon Web Services", + "url": "https://aws.amazon.com" + }, + "publishConfig": { + "access": "public" + }, + "scripts": { + "test": "npm run test:unit", + "test:unit": "jest --group=unit --detectOpenHandles --coverage --verbose", + "test:e2e:nodejs14x": "echo 'Not Implemented'", + "test:e2e:nodejs16x": "echo 'Not Implemented'", + "test:e2e:nodejs18x": "echo 'Not Implemented'", + "test:e2e": "echo 'Not Implemented'", + "watch": "jest --watch", + "build": "tsc", + "lint": "eslint --ext .ts,.js --no-error-on-unmatched-pattern .", + "lint-fix": "eslint --fix --ext .ts,.js --no-error-on-unmatched-pattern .", + "prebuild": "rimraf ./lib", + "prepack": "node ../../.github/scripts/release_patch_package_json.js ." + }, + "lint-staged": { + "*.{js,ts}": "npm run lint-fix" + }, + "homepage": "https://github.com/aws-powertools/powertools-lambda-typescript/tree/main/packages/batch#readme", + "license": "MIT-0", + "main": "./lib/index.js", + "types": "./lib/index.d.ts", + "files": [ + "lib" + ], + "repository": { + "type": "git", + "url": "git+https://github.com/aws-powertools/powertools-lambda-typescript.git" + }, + "bugs": { + "url": "https://github.com/aws-powertools/powertools-lambda-typescript/issues" + }, + "dependencies": {}, + "keywords": [ + "aws", + "lambda", + "powertools", + "batch", + "batch-processing", + "serverless", + "nodejs" + ], + "devDependencies": {} +} \ No newline at end of file diff --git a/packages/batch/src/AsyncBatchProcessor.ts b/packages/batch/src/AsyncBatchProcessor.ts new file mode 100644 index 0000000000..781c7f1c79 --- /dev/null +++ b/packages/batch/src/AsyncBatchProcessor.ts @@ -0,0 +1,31 @@ +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; + +/** + * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB + */ +class AsyncBatchProcessor extends BasePartialBatchProcessor { + public async asyncProcessRecord( + record: BaseRecord + ): Promise { + try { + const data = this.toBatchType(record, this.eventType); + const result = await this.handler(data, this.options); + + return this.successHandler(record, result); + } catch (error) { + return this.failureHandler(record, error as Error); + } + } + + /** + * Process a record with instance's handler + * @param record Batch record to be processed + * @returns response of success or failure + */ + public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse { + throw new Error('Not implemented. Use asyncProcess() instead.'); + } +} + +export { AsyncBatchProcessor }; diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts new file mode 100644 index 0000000000..d4cfd7e9ce --- /dev/null +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -0,0 +1,157 @@ +import type { + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; +import { BasePartialProcessor } from './BasePartialProcessor'; +import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants'; +import { BatchProcessingError } from './errors'; +import type { + EventSourceDataClassTypes, + PartialItemFailureResponse, + PartialItemFailures, +} from './types'; + +/** + * Process batch and partially report failed items + */ +abstract class BasePartialBatchProcessor extends BasePartialProcessor { + public COLLECTOR_MAPPING; + + public batchResponse: PartialItemFailureResponse; + + public eventType: keyof typeof EventType; + + /** + * Initializes base batch processing class + * @param eventType Whether this is SQS, DynamoDB stream, or Kinesis data stream event + */ + public constructor(eventType: keyof typeof EventType) { + super(); + this.eventType = eventType; + this.batchResponse = DEFAULT_RESPONSE; + this.COLLECTOR_MAPPING = { + [EventType.SQS]: () => this.collectSqsFailures(), + [EventType.KinesisDataStreams]: () => this.collectKinesisFailures(), + [EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), + }; + } + + /** + * Report messages to be deleted in case of partial failures + */ + public clean(): void { + if (!this.hasMessagesToReport()) { + return; + } + + if (this.entireBatchFailed()) { + throw new BatchProcessingError( + 'All records failed processing. ' + + this.exceptions.length + + ' individual errors logged separately below.', + this.exceptions + ); + } + + const messages: PartialItemFailures[] = this.getMessagesToReport(); + this.batchResponse = { batchItemFailures: messages }; + } + + /** + * Collects identifiers of failed items for a DynamoDB stream + * @returns list of identifiers for failed items + */ + public collectDynamoDBFailures(): PartialItemFailures[] { + const failures: PartialItemFailures[] = []; + + for (const msg of this.failureMessages) { + const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber; + if (msgId) { + failures.push({ itemIdentifier: msgId }); + } + } + + return failures; + } + + /** + * Collects identifiers of failed items for a Kinesis stream + * @returns list of identifiers for failed items + */ + public collectKinesisFailures(): PartialItemFailures[] { + const failures: PartialItemFailures[] = []; + + for (const msg of this.failureMessages) { + const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber; + failures.push({ itemIdentifier: msgId }); + } + + return failures; + } + + /** + * Collects identifiers of failed items for an SQS batch + * @returns list of identifiers for failed items + */ + public collectSqsFailures(): PartialItemFailures[] { + const failures: PartialItemFailures[] = []; + + for (const msg of this.failureMessages) { + const msgId = (msg as SQSRecord).messageId; + failures.push({ itemIdentifier: msgId }); + } + + return failures; + } + + /** + * Determines whether all records in a batch failed to process + * @returns true if all records resulted in exception results + */ + public entireBatchFailed(): boolean { + return this.exceptions.length == this.records.length; + } + + /** + * Collects identifiers for failed batch items + * @returns formatted messages to use in batch deletion + */ + public getMessagesToReport(): PartialItemFailures[] { + return this.COLLECTOR_MAPPING[this.eventType](); + } + + /** + * Determines if any records failed to process + * @returns true if any records resulted in exception + */ + public hasMessagesToReport(): boolean { + return this.failureMessages.length != 0; + } + + /** + * Remove results from previous execution + */ + public prepare(): void { + this.successMessages.length = 0; + this.failureMessages.length = 0; + this.exceptions.length = 0; + this.batchResponse = DEFAULT_RESPONSE; + } + + /** + * @returns Batch items that failed processing, if any + */ + public response(): PartialItemFailureResponse { + return this.batchResponse; + } + + public toBatchType( + record: EventSourceDataClassTypes, + eventType: keyof typeof EventType + ): SQSRecord | KinesisStreamRecord | DynamoDBRecord { + return DATA_CLASS_MAPPING[eventType](record); + } +} + +export { BasePartialBatchProcessor }; diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts new file mode 100644 index 0000000000..ecd62c29b0 --- /dev/null +++ b/packages/batch/src/BasePartialProcessor.ts @@ -0,0 +1,168 @@ +import type { + BaseRecord, + BatchProcessingOptions, + EventSourceDataClassTypes, + FailureResponse, + ResultType, + SuccessResponse, +} from './types'; + +/** + * Abstract class for batch processors. + */ +abstract class BasePartialProcessor { + public exceptions: Error[]; + + public failureMessages: EventSourceDataClassTypes[]; + + public handler: CallableFunction; + + public options?: BatchProcessingOptions; + + public records: BaseRecord[]; + + public successMessages: EventSourceDataClassTypes[]; + + /** + * Initializes base processor class + */ + public constructor() { + this.successMessages = []; + this.failureMessages = []; + this.exceptions = []; + this.records = []; + this.handler = new Function(); + } + + /** + * Call instance's handler for each record + * @returns List of processed records + */ + public async asyncProcess(): Promise<(SuccessResponse | FailureResponse)[]> { + /** + * If this is an sync processor, user should have called process instead, + * so we call the method early to throw the error early thus failing fast. + */ + if (this.constructor.name === 'BatchProcessor') { + await this.asyncProcessRecord(this.records[0]); + } + this.prepare(); + + const processingPromises: Promise[] = + this.records.map((record) => this.asyncProcessRecord(record)); + + const processedRecords: (SuccessResponse | FailureResponse)[] = + await Promise.all(processingPromises); + + this.clean(); + + return processedRecords; + } + + /** + * Process a record with an asyncronous handler + * + * @param record Record to be processed + */ + public abstract asyncProcessRecord( + record: BaseRecord + ): Promise; + + /** + * Clean class instance after processing + */ + public abstract clean(): void; + + /** + * Keeps track of batch records that failed processing + * @param record record that failed processing + * @param exception exception that was thrown + * @returns FailureResponse object with ["fail", exception, original record] + */ + public failureHandler( + record: EventSourceDataClassTypes, + exception: Error + ): FailureResponse { + const entry: FailureResponse = ['fail', exception.message, record]; + this.exceptions.push(exception); + this.failureMessages.push(record); + + return entry; + } + + /** + * Prepare class instance before processing + */ + public abstract prepare(): void; + + /** + * Call instance's handler for each record + * @returns List of processed records + */ + public process(): (SuccessResponse | FailureResponse)[] { + /** + * If this is an async processor, user should have called processAsync instead, + * so we call the method early to throw the error early thus failing fast. + */ + if (this.constructor.name === 'AsyncBatchProcessor') { + this.processRecord(this.records[0]); + } + this.prepare(); + + const processedRecords: (SuccessResponse | FailureResponse)[] = []; + for (const record of this.records) { + processedRecords.push(this.processRecord(record)); + } + + this.clean(); + + return processedRecords; + } + + /** + * Process a record with the handler + * @param record Record to be processed + */ + public abstract processRecord( + record: BaseRecord + ): SuccessResponse | FailureResponse; + + /** + * Set class instance attributes before execution + * @param records List of records to be processed + * @param handler CallableFunction to process entries of "records" + * @returns this object + */ + public register( + records: BaseRecord[], + handler: CallableFunction, + options?: BatchProcessingOptions + ): BasePartialProcessor { + this.records = records; + this.handler = handler; + + if (options) { + this.options = options; + } + + return this; + } + + /** + * Keeps track of batch records that were processed successfully + * @param record record that succeeded processing + * @param result result from record handler + * @returns SuccessResponse object with ["success", result, original record] + */ + public successHandler( + record: EventSourceDataClassTypes, + result: ResultType + ): SuccessResponse { + const entry: SuccessResponse = ['success', result, record]; + this.successMessages.push(record); + + return entry; + } +} + +export { BasePartialProcessor }; diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts new file mode 100644 index 0000000000..3d2a75a8da --- /dev/null +++ b/packages/batch/src/BatchProcessor.ts @@ -0,0 +1,31 @@ +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; + +/** + * Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB + */ +class BatchProcessor extends BasePartialBatchProcessor { + public async asyncProcessRecord( + _record: BaseRecord + ): Promise { + throw new Error('Not implemented. Use process() instead.'); + } + + /** + * Process a record with instance's handler + * @param record Batch record to be processed + * @returns response of success or failure + */ + public processRecord(record: BaseRecord): SuccessResponse | FailureResponse { + try { + const data = this.toBatchType(record, this.eventType); + const result = this.handler(data, this.options); + + return this.successHandler(record, result); + } catch (error) { + return this.failureHandler(record, error as Error); + } + } +} + +export { BatchProcessor }; diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts new file mode 100644 index 0000000000..0c10993273 --- /dev/null +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -0,0 +1,68 @@ +import { BatchProcessor } from './BatchProcessor'; +import { EventType } from './constants'; +import type { FailureResponse, SuccessResponse } from './types'; + +/** + * Process native partial responses from SQS FIFO queues + * Stops processing records when the first record fails + * The remaining records are reported as failed items + */ +class SqsFifoPartialProcessor extends BatchProcessor { + public constructor() { + super(EventType.SQS); + } + + /** + * Call instance's handler for each record. + * When the first failed message is detected, the process is short-circuited + * And the remaining messages are reported as failed items + */ + public process(): (SuccessResponse | FailureResponse)[] { + this.prepare(); + + const processedRecords: (SuccessResponse | FailureResponse)[] = []; + let currentIndex = 0; + for (const record of this.records) { + // If we have any failed messages, it means the last message failed + // We should then short circuit the process and fail remaining messages + if (this.failureMessages.length != 0) { + return this.shortCircuitProcessing(currentIndex, processedRecords); + } + + processedRecords.push(this.processRecord(record)); + currentIndex++; + } + + this.clean(); + + return processedRecords; + } + + /** + * Starting from the first failure index, fail all remaining messages and append them to the result list + * @param firstFailureIndex Index of first message that failed + * @param result List of success and failure responses with remaining messages failed + */ + public shortCircuitProcessing( + firstFailureIndex: number, + processedRecords: (SuccessResponse | FailureResponse)[] + ): (SuccessResponse | FailureResponse)[] { + const remainingRecords = this.records.slice(firstFailureIndex); + + for (const record of remainingRecords) { + const data = this.toBatchType(record, this.eventType); + processedRecords.push( + this.failureHandler( + data, + new Error('A previous record failed processing') + ) + ); + } + + this.clean(); + + return processedRecords; + } +} + +export { SqsFifoPartialProcessor }; diff --git a/packages/batch/src/asyncProcessPartialResponse.ts b/packages/batch/src/asyncProcessPartialResponse.ts new file mode 100644 index 0000000000..eee584ed1f --- /dev/null +++ b/packages/batch/src/asyncProcessPartialResponse.ts @@ -0,0 +1,38 @@ +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import { EventType } from './constants'; +import type { + BaseRecord, + BatchProcessingOptions, + PartialItemFailureResponse, +} from './types'; + +/** + * Higher level function to handle batch event processing + * @param event Lambda's original event + * @param recordHandler Callable function to process each record from the batch + * @param processor Batch processor to handle partial failure cases + * @returns Lambda Partial Batch Response + */ +const asyncProcessPartialResponse = async ( + event: { Records: BaseRecord[] }, + recordHandler: CallableFunction, + processor: BasePartialBatchProcessor, + options?: BatchProcessingOptions +): Promise => { + if (!event.Records) { + const eventTypes: string = Object.values(EventType).toString(); + throw new Error( + 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + + eventTypes + + ' event.' + ); + } + + processor.register(event.Records, recordHandler, options); + + await processor.asyncProcess(); + + return processor.response(); +}; + +export { asyncProcessPartialResponse }; diff --git a/packages/batch/src/constants.ts b/packages/batch/src/constants.ts new file mode 100644 index 0000000000..02437e356c --- /dev/null +++ b/packages/batch/src/constants.ts @@ -0,0 +1,25 @@ +import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; +import type { + PartialItemFailureResponse, + EventSourceDataClassTypes, +} from './types'; + +const EventType = { + SQS: 'SQS', + KinesisDataStreams: 'KinesisDataStreams', + DynamoDBStreams: 'DynamoDBStreams', +} as const; + +const DEFAULT_RESPONSE: PartialItemFailureResponse = { + batchItemFailures: [], +}; + +const DATA_CLASS_MAPPING = { + [EventType.SQS]: (record: EventSourceDataClassTypes) => record as SQSRecord, + [EventType.KinesisDataStreams]: (record: EventSourceDataClassTypes) => + record as KinesisStreamRecord, + [EventType.DynamoDBStreams]: (record: EventSourceDataClassTypes) => + record as DynamoDBRecord, +}; + +export { EventType, DEFAULT_RESPONSE, DATA_CLASS_MAPPING }; diff --git a/packages/batch/src/errors.ts b/packages/batch/src/errors.ts new file mode 100644 index 0000000000..ed5bd4fc9e --- /dev/null +++ b/packages/batch/src/errors.ts @@ -0,0 +1,49 @@ +/** + * Base error type for batch processing + * All errors thrown by major failures extend this base class + */ +class BaseBatchProcessingError extends Error { + public childErrors: Error[]; + + public msg: string; + + public constructor(msg: string, childErrors: Error[]) { + super(msg); + this.msg = msg; + this.childErrors = childErrors; + } + + /** + * Generates a list of errors that were generated by the major failure + * @returns Formatted string listing all the errors that occurred + * + * @example + * When all batch records fail to be processed, this will generate a string like: + * All records failed processing. 3 individual errors logged separately below. + * ,Failed to process record. + * ,Failed to process record. + * ,Failed to process record. + */ + public formatErrors(parentErrorString: string): string { + const errorList: string[] = [parentErrorString + '\n']; + + for (const error of this.childErrors) { + errorList.push(error.message + '\n'); + } + + return '\n' + errorList; + } +} + +/** + * When all batch records failed to be processed + */ +class BatchProcessingError extends BaseBatchProcessingError { + public constructor(msg: string, childErrors: Error[]) { + super(msg, childErrors); + const parentErrorString: string = this.message; + this.message = this.formatErrors(parentErrorString); + } +} + +export { BaseBatchProcessingError, BatchProcessingError }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts new file mode 100644 index 0000000000..96f931823d --- /dev/null +++ b/packages/batch/src/index.ts @@ -0,0 +1,10 @@ +export * from './constants'; +export * from './errors'; +export * from './types'; +export * from './BasePartialProcessor'; +export * from './BasePartialBatchProcessor'; +export * from './BatchProcessor'; +export * from './AsyncBatchProcessor'; +export * from './processPartialResponse'; +export * from './asyncProcessPartialResponse'; +export * from './SqsFifoPartialProcessor'; diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts new file mode 100644 index 0000000000..d09e7be6b9 --- /dev/null +++ b/packages/batch/src/processPartialResponse.ts @@ -0,0 +1,38 @@ +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import { EventType } from './constants'; +import type { + BaseRecord, + BatchProcessingOptions, + PartialItemFailureResponse, +} from './types'; + +/** + * Higher level function to handle batch event processing + * @param event Lambda's original event + * @param recordHandler Callable function to process each record from the batch + * @param processor Batch processor to handle partial failure cases + * @returns Lambda Partial Batch Response + */ +const processPartialResponse = ( + event: { Records: BaseRecord[] }, + recordHandler: CallableFunction, + processor: BasePartialBatchProcessor, + options?: BatchProcessingOptions +): PartialItemFailureResponse => { + if (!event.Records) { + const eventTypes: string = Object.values(EventType).toString(); + throw new Error( + 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + + eventTypes + + ' event.' + ); + } + + processor.register(event.Records, recordHandler, options); + + processor.process(); + + return processor.response(); +}; + +export { processPartialResponse }; diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts new file mode 100644 index 0000000000..17ce3633c7 --- /dev/null +++ b/packages/batch/src/types.ts @@ -0,0 +1,37 @@ +import { + Context, + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; + +type BatchProcessingOptions = { + context: Context; +}; + +type EventSourceDataClassTypes = + | SQSRecord + | KinesisStreamRecord + | DynamoDBRecord; + +type RecordValue = unknown; +type BaseRecord = { [key: string]: RecordValue } | EventSourceDataClassTypes; + +type ResultType = unknown; +type SuccessResponse = ['success', ResultType, EventSourceDataClassTypes]; + +type FailureResponse = ['fail', string, EventSourceDataClassTypes]; + +type PartialItemFailures = { itemIdentifier: string }; +type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; + +export type { + BatchProcessingOptions, + BaseRecord, + EventSourceDataClassTypes, + ResultType, + SuccessResponse, + FailureResponse, + PartialItemFailures, + PartialItemFailureResponse, +}; diff --git a/packages/batch/tests/helpers/factories.ts b/packages/batch/tests/helpers/factories.ts new file mode 100644 index 0000000000..7df6110742 --- /dev/null +++ b/packages/batch/tests/helpers/factories.ts @@ -0,0 +1,75 @@ +import type { + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; +import { randomInt, randomUUID } from 'node:crypto'; + +const sqsRecordFactory = (body: string): SQSRecord => { + return { + messageId: randomUUID(), + receiptHandle: 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', + body: body, + attributes: { + ApproximateReceiveCount: '1', + SentTimestamp: '1545082649183', + SenderId: 'AIDAIENQZJOLO23YVJ4VO', + ApproximateFirstReceiveTimestamp: '1545082649185', + }, + messageAttributes: {}, + md5OfBody: 'e4e68fb7bd0e697a0ae8f1bb342846b3', + eventSource: 'aws:sqs', + eventSourceARN: 'arn:aws:sqs:us-east-2:123456789012:my-queue', + awsRegion: 'us-east-1', + }; +}; + +const kinesisRecordFactory = (body: string): KinesisStreamRecord => { + let seq = ''; + for (let i = 0; i < 52; i++) { + seq = seq + randomInt(10); + } + + return { + kinesis: { + kinesisSchemaVersion: '1.0', + partitionKey: '1', + sequenceNumber: seq, + data: body, + approximateArrivalTimestamp: 1545084650.987, + }, + eventSource: 'aws:kinesis', + eventVersion: '1.0', + eventID: 'shardId-000000000006:' + seq, + eventName: 'aws:kinesis:record', + invokeIdentityArn: 'arn:aws:iam::123456789012:role/lambda-role', + awsRegion: 'us-east-2', + eventSourceARN: + 'arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream', + }; +}; + +const dynamodbRecordFactory = (body: string): DynamoDBRecord => { + let seq = ''; + for (let i = 0; i < 10; i++) { + seq = seq + randomInt(10); + } + + return { + eventID: '1', + eventVersion: '1.0', + dynamodb: { + Keys: { Id: { N: '101' } }, + NewImage: { Message: { S: body } }, + StreamViewType: 'NEW_AND_OLD_IMAGES', + SequenceNumber: seq, + SizeBytes: 26, + }, + awsRegion: 'us-west-2', + eventName: 'INSERT', + eventSourceARN: 'eventsource_arn', + eventSource: 'aws:dynamodb', + }; +}; + +export { sqsRecordFactory, kinesisRecordFactory, dynamodbRecordFactory }; diff --git a/packages/batch/tests/helpers/handlers.ts b/packages/batch/tests/helpers/handlers.ts new file mode 100644 index 0000000000..3a6d17b76a --- /dev/null +++ b/packages/batch/tests/helpers/handlers.ts @@ -0,0 +1,109 @@ +import type { + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; +import type { BatchProcessingOptions } from '../../src/types'; + +const sqsRecordHandler = (record: SQSRecord): string => { + const body = record.body; + if (body.includes('fail')) { + throw Error('Failed to process record.'); + } + + return body; +}; + +const asyncSqsRecordHandler = async (record: SQSRecord): Promise => { + const body = record.body; + if (body.includes('fail')) { + throw Error('Failed to process record.'); + } + + return body; +}; + +const kinesisRecordHandler = (record: KinesisStreamRecord): string => { + const body = record.kinesis.data; + if (body.includes('fail')) { + throw Error('Failed to process record.'); + } + + return body; +}; + +const asyncKinesisRecordHandler = async ( + record: KinesisStreamRecord +): Promise => { + const body = record.kinesis.data; + if (body.includes('fail')) { + throw Error('Failed to process record.'); + } + + return body; +}; + +const dynamodbRecordHandler = (record: DynamoDBRecord): object => { + const body = record.dynamodb?.NewImage?.Message || { S: 'fail' }; + if (body['S']?.includes('fail')) { + throw Error('Failed to process record.'); + } + + return body; +}; + +const asyncDynamodbRecordHandler = async ( + record: DynamoDBRecord +): Promise => { + const body = record.dynamodb?.NewImage?.Message || { S: 'fail' }; + if (body['S']?.includes('fail')) { + throw Error('Failed to process record.'); + } + + return body; +}; + +const handlerWithContext = ( + record: SQSRecord, + options: BatchProcessingOptions +): string => { + const context = options.context; + + try { + if (context.getRemainingTimeInMillis() == 0) { + throw Error('No time remaining.'); + } + } catch (e) { + throw Error('Context possibly malformed. Displaying context:\n' + context); + } + + return record.body; +}; + +const asyncHandlerWithContext = async ( + record: SQSRecord, + options: BatchProcessingOptions +): Promise => { + const context = options.context; + + try { + if (context.getRemainingTimeInMillis() == 0) { + throw Error('No time remaining.'); + } + } catch (e) { + throw Error('Context possibly malformed. Displaying context:\n' + context); + } + + return Promise.resolve(record.body); +}; + +export { + sqsRecordHandler, + asyncSqsRecordHandler, + kinesisRecordHandler, + asyncKinesisRecordHandler, + dynamodbRecordHandler, + asyncDynamodbRecordHandler, + handlerWithContext, + asyncHandlerWithContext, +}; diff --git a/packages/batch/tests/helpers/populateEnvironmentVariables.ts b/packages/batch/tests/helpers/populateEnvironmentVariables.ts new file mode 100644 index 0000000000..cb0b37f295 --- /dev/null +++ b/packages/batch/tests/helpers/populateEnvironmentVariables.ts @@ -0,0 +1,12 @@ +// Reserved variables +process.env._X_AMZN_TRACE_ID = '1-abcdef12-3456abcdef123456abcdef12'; +process.env.AWS_LAMBDA_FUNCTION_NAME = 'my-lambda-function'; +process.env.AWS_EXECUTION_ENV = 'nodejs18.x'; +process.env.AWS_LAMBDA_FUNCTION_MEMORY_SIZE = '128'; +if ( + process.env.AWS_REGION === undefined && + process.env.CDK_DEFAULT_REGION === undefined +) { + process.env.AWS_REGION = 'eu-west-1'; +} +process.env._HANDLER = 'index.handler'; diff --git a/packages/batch/tests/unit/AsyncBatchProcessor.test.ts b/packages/batch/tests/unit/AsyncBatchProcessor.test.ts new file mode 100644 index 0000000000..9079a1c464 --- /dev/null +++ b/packages/batch/tests/unit/AsyncBatchProcessor.test.ts @@ -0,0 +1,296 @@ +/** + * Test AsyncBatchProcessor class + * + * @group unit/batch/class/asyncBatchProcessor + */ +import type { Context } from 'aws-lambda'; +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { AsyncBatchProcessor } from '../../src/AsyncBatchProcessor'; +import { EventType } from '../../src/constants'; +import { BatchProcessingError } from '../../src/errors'; +import type { BatchProcessingOptions } from '../../src/types'; +import { + dynamodbRecordFactory, + kinesisRecordFactory, + sqsRecordFactory, +} from '../helpers/factories'; +import { + asyncDynamodbRecordHandler, + asyncKinesisRecordHandler, + asyncSqsRecordHandler, + asyncHandlerWithContext, +} from '../helpers/handlers'; + +describe('Class: AsyncBatchProcessor', () => { + const ENVIRONMENT_VARIABLES = process.env; + const options: BatchProcessingOptions = { context: dummyContext }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Asynchronously processing SQS Records', () => { + test('Batch processing SQS records with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing SQS records with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.body, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.messageId }, + { itemIdentifier: thirdRecord.messageId }, + ], + }); + }); + + test('Batch processing SQS records with all failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler); + + // Assess + await expect(processor.asyncProcess()).rejects.toThrowError( + BatchProcessingError + ); + }); + }); + + describe('Asynchronously processing Kinesis Records', () => { + test('Batch processing Kinesis records with no failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.kinesis.data, firstRecord], + ['success', secondRecord.kinesis.data, secondRecord], + ]); + }); + + test('Batch processing Kinesis records with some failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.kinesis.data, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.kinesis.sequenceNumber }, + { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, + ], + }); + }); + + test('Batch processing Kinesis records with all failures', async () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, asyncKinesisRecordHandler); + + // Assess + await expect(processor.asyncProcess()).rejects.toThrowError( + BatchProcessingError + ); + }); + }); + + describe('Asynchronously processing DynamoDB Records', () => { + test('Batch processing DynamoDB records with no failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], + ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], + ]); + }); + + test('Batch processing DynamoDB records with some failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.dynamodb?.NewImage?.Message, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, + { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, + ], + }); + }); + + test('Batch processing DynamoDB records with all failures', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, asyncDynamodbRecordHandler); + + // Assess + await expect(processor.asyncProcess()).rejects.toThrowError( + BatchProcessingError + ); + }); + }); + + describe('Batch processing with Lambda context', () => { + test('Batch processing when context is provided and handler accepts', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncHandlerWithContext, options); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing when context is provided and handler does not accept', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + processor.register(records, asyncSqsRecordHandler, options); + const processedMessages = await processor.asyncProcess(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing when malformed context is provided and handler attempts to use', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new AsyncBatchProcessor(EventType.SQS); + const badContext = { foo: 'bar' }; + const badOptions = { context: badContext as unknown as Context }; + + // Act + processor.register(records, asyncHandlerWithContext, badOptions); + await expect(() => processor.asyncProcess()).rejects.toThrowError( + BatchProcessingError + ); + }); + }); + + test('When calling the sync process method, it should throw an error', () => { + // Prepare + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act & Assess + expect(() => processor.process()).toThrowError( + 'Not implemented. Use asyncProcess() instead.' + ); + }); +}); diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts new file mode 100644 index 0000000000..5be28271d2 --- /dev/null +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -0,0 +1,285 @@ +/** + * Test BatchProcessor class + * + * @group unit/batch/class/batchprocessor + */ +import type { Context } from 'aws-lambda'; +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { BatchProcessor } from '../../src/BatchProcessor'; +import { EventType } from '../../src/constants'; +import { BatchProcessingError } from '../../src/errors'; +import type { BatchProcessingOptions } from '../../src/types'; +import { + dynamodbRecordFactory, + kinesisRecordFactory, + sqsRecordFactory, +} from '../helpers/factories'; +import { + dynamodbRecordHandler, + handlerWithContext, + kinesisRecordHandler, + sqsRecordHandler, +} from '../helpers/handlers'; + +describe('Class: BatchProcessor', () => { + const ENVIRONMENT_VARIABLES = process.env; + const options: BatchProcessingOptions = { context: dummyContext }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Synchronously processing SQS Records', () => { + test('Batch processing SQS records with no failures', () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, sqsRecordHandler); + const processedMessages = processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing SQS records with some failures', () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('success'); + const thirdRecord = sqsRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, sqsRecordHandler); + const processedMessages = processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.body, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.messageId }, + { itemIdentifier: thirdRecord.messageId }, + ], + }); + }); + + test('Batch processing SQS records with all failures', () => { + // Prepare + const firstRecord = sqsRecordFactory('failure'); + const secondRecord = sqsRecordFactory('failure'); + const thirdRecord = sqsRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + processor.register(records, sqsRecordHandler); + expect(() => processor.process()).toThrowError(BatchProcessingError); + }); + }); + + describe('Synchronously processing Kinesis Records', () => { + test('Batch processing Kinesis records with no failures', () => { + // Prepare + const firstRecord = kinesisRecordFactory('success'); + const secondRecord = kinesisRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, kinesisRecordHandler); + const processedMessages = processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.kinesis.data, firstRecord], + ['success', secondRecord.kinesis.data, secondRecord], + ]); + }); + + test('Batch processing Kinesis records with some failures', () => { + // Prepare + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('success'); + const thirdRecord = kinesisRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, kinesisRecordHandler); + const processedMessages = processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.kinesis.data, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.kinesis.sequenceNumber }, + { itemIdentifier: thirdRecord.kinesis.sequenceNumber }, + ], + }); + }); + + test('Batch processing Kinesis records with all failures', () => { + const firstRecord = kinesisRecordFactory('failure'); + const secondRecord = kinesisRecordFactory('failure'); + const thirdRecord = kinesisRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + + // Act + processor.register(records, kinesisRecordHandler); + + // Assess + expect(() => processor.process()).toThrowError(BatchProcessingError); + }); + }); + + describe('Synchronously processing DynamoDB Records', () => { + test('Batch processing DynamoDB records with no failures', () => { + // Prepare + const firstRecord = dynamodbRecordFactory('success'); + const secondRecord = dynamodbRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, dynamodbRecordHandler); + const processedMessages = processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.dynamodb?.NewImage?.Message, firstRecord], + ['success', secondRecord.dynamodb?.NewImage?.Message, secondRecord], + ]); + }); + + test('Batch processing DynamoDB records with some failures', () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('success'); + const thirdRecord = dynamodbRecordFactory('fail'); + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, dynamodbRecordHandler); + const processedMessages = processor.process(); + + // Assess + expect(processedMessages[1]).toStrictEqual([ + 'success', + secondRecord.dynamodb?.NewImage?.Message, + secondRecord, + ]); + expect(processor.failureMessages.length).toBe(2); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: firstRecord.dynamodb?.SequenceNumber }, + { itemIdentifier: thirdRecord.dynamodb?.SequenceNumber }, + ], + }); + }); + + test('Batch processing DynamoDB records with all failures', () => { + // Prepare + const firstRecord = dynamodbRecordFactory('failure'); + const secondRecord = dynamodbRecordFactory('failure'); + const thirdRecord = dynamodbRecordFactory('fail'); + + const records = [firstRecord, secondRecord, thirdRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + + // Act + processor.register(records, dynamodbRecordHandler); + + // Assess + expect(() => processor.process()).toThrowError(BatchProcessingError); + }); + }); + + describe('Batch processing with Lambda context', () => { + test('Batch processing when context is provided and handler accepts', () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, handlerWithContext, options); + const processedMessages = processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing when context is provided and handler does not accept', () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + + // Act + processor.register(records, sqsRecordHandler, options); + const processedMessages = processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', firstRecord.body, firstRecord], + ['success', secondRecord.body, secondRecord], + ]); + }); + + test('Batch processing when malformed context is provided and handler attempts to use', () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS); + const badContext = { foo: 'bar' }; + const badOptions = { context: badContext as unknown as Context }; + + // Act + processor.register(records, handlerWithContext, badOptions); + expect(() => processor.process()).toThrowError(BatchProcessingError); + }); + }); + + test('When calling the async process method, it should throw an error', async () => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + + // Act & Assess + await expect(() => processor.asyncProcess()).rejects.toThrow( + 'Not implemented. Use process() instead.' + ); + }); +}); diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts new file mode 100644 index 0000000000..564886b1d8 --- /dev/null +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -0,0 +1,59 @@ +/** + * Test SqsFifoBatchProcessor class + * + * @group unit/batch/class/sqsfifobatchprocessor + */ +import { SqsFifoPartialProcessor, processPartialResponse } from '../../src'; +import { sqsRecordFactory } from '../helpers/factories'; +import { sqsRecordHandler } from '../helpers/handlers'; + +describe('Class: SqsFifoBatchProcessor', () => { + const ENVIRONMENT_VARIABLES = process.env; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Synchronous SQS FIFO batch processing', () => { + test('SQS FIFO Batch processor with no failures', () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord] }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponse(event, sqsRecordHandler, processor); + + // Assess + expect(result['batchItemFailures']).toStrictEqual([]); + }); + + test('SQS FIFO Batch processor with failures', () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('fail'); + const thirdRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord, thirdRecord] }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponse(event, sqsRecordHandler, processor); + + // Assess + expect(result['batchItemFailures'].length).toBe(2); + expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + secondRecord.messageId + ); + expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( + thirdRecord.messageId + ); + }); + }); +}); diff --git a/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts b/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts new file mode 100644 index 0000000000..fde15ccf42 --- /dev/null +++ b/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts @@ -0,0 +1,231 @@ +/** + * Test asyncProcessPartialResponse function + * + * @group unit/batch/function/asyncProcesspartialresponse + */ +import type { + Context, + DynamoDBStreamEvent, + KinesisStreamEvent, + SQSEvent, +} from 'aws-lambda'; +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { Custom as dummyEvent } from '../../../commons/src/samples/resources/events'; +import { AsyncBatchProcessor, asyncProcessPartialResponse } from '../../src'; +import { EventType } from '../../src/constants'; +import type { + BatchProcessingOptions, + PartialItemFailureResponse, +} from '../../src/types'; +import { + dynamodbRecordFactory, + kinesisRecordFactory, + sqsRecordFactory, +} from '../helpers/factories'; +import { + asyncDynamodbRecordHandler, + asyncHandlerWithContext, + asyncKinesisRecordHandler, + asyncSqsRecordHandler, +} from '../helpers/handlers'; + +describe('Function: processPartialResponse()', () => { + const ENVIRONMENT_VARIABLES = process.env; + const context = dummyContext; + const options: BatchProcessingOptions = { context: dummyContext }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Process partial response function call tests', () => { + test('Process partial response function call with asynchronous handler', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + const ret = await asyncProcessPartialResponse( + batch, + asyncSqsRecordHandler, + processor + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response function call with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new AsyncBatchProcessor(EventType.SQS); + + // Act + const ret = await asyncProcessPartialResponse( + batch, + asyncHandlerWithContext, + processor, + options + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); + }); + + describe('Process partial response function call through handler', () => { + test('Process partial response through handler with SQS event', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new AsyncBatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return asyncProcessPartialResponse( + event, + asyncSqsRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with Kinesis event', async () => { + // Prepare + const records = [ + kinesisRecordFactory('success'), + kinesisRecordFactory('success'), + ]; + const processor = new AsyncBatchProcessor(EventType.KinesisDataStreams); + const event: KinesisStreamEvent = { Records: records }; + + const handler = async ( + event: KinesisStreamEvent, + _context: Context + ): Promise => { + return await asyncProcessPartialResponse( + event, + asyncKinesisRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with DynamoDB event', async () => { + // Prepare + const records = [ + dynamodbRecordFactory('success'), + dynamodbRecordFactory('success'), + ]; + const processor = new AsyncBatchProcessor(EventType.DynamoDBStreams); + const event: DynamoDBStreamEvent = { Records: records }; + + const handler = async ( + event: DynamoDBStreamEvent, + _context: Context + ): Promise => { + return await asyncProcessPartialResponse( + event, + asyncDynamodbRecordHandler, + processor + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler for SQS records with incorrect event type', async () => { + // Prepare + const processor = new AsyncBatchProcessor(EventType.SQS); + const event = dummyEvent; + const eventTypes: string = Object.values(EventType).toString(); + + const handler = async ( + event: SQSEvent, + _context: Context + ): Promise => { + return await asyncProcessPartialResponse( + event, + asyncSqsRecordHandler, + processor + ); + }; + + // Act & Assess + await expect(() => + handler(event as unknown as SQSEvent, context) + ).rejects.toThrowError( + new Error( + 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + + eventTypes + + ' event.' + ) + ); + }); + + test('Process partial response through handler with context provided', async () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new AsyncBatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = async ( + event: SQSEvent, + context: Context + ): Promise => { + const options: BatchProcessingOptions = { context: context }; + + return await asyncProcessPartialResponse( + event, + asyncHandlerWithContext, + processor, + options + ); + }; + + // Act + const result = await handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + }); +}); diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts new file mode 100644 index 0000000000..3de2edcce3 --- /dev/null +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -0,0 +1,209 @@ +/** + * Test processPartialResponse function + * + * @group unit/batch/function/processpartialresponse + */ +import type { + Context, + DynamoDBStreamEvent, + KinesisStreamEvent, + SQSEvent, +} from 'aws-lambda'; +import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; +import { Custom as dummyEvent } from '../../../commons/src/samples/resources/events'; +import { BatchProcessor, processPartialResponse } from '../../src'; +import { EventType } from '../../src/constants'; +import type { + BatchProcessingOptions, + PartialItemFailureResponse, +} from '../../src/types'; +import { + dynamodbRecordFactory, + kinesisRecordFactory, + sqsRecordFactory, +} from '../helpers/factories'; +import { + dynamodbRecordHandler, + handlerWithContext, + kinesisRecordHandler, + sqsRecordHandler, +} from '../helpers/handlers'; + +describe('Function: processPartialResponse()', () => { + const ENVIRONMENT_VARIABLES = process.env; + const context = dummyContext; + const options: BatchProcessingOptions = { context: dummyContext }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + process.env = { ...ENVIRONMENT_VARIABLES }; + }); + + afterAll(() => { + process.env = ENVIRONMENT_VARIABLES; + }); + + describe('Process partial response function call tests', () => { + test('Process partial response function call with synchronous handler', () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act + const ret = processPartialResponse(batch, sqsRecordHandler, processor); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response function call with context provided', () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const batch = { Records: records }; + const processor = new BatchProcessor(EventType.SQS); + + // Act + const ret = processPartialResponse( + batch, + handlerWithContext, + processor, + options + ); + + // Assess + expect(ret).toStrictEqual({ batchItemFailures: [] }); + }); + }); + + describe('Process partial response function call through handler', () => { + test('Process partial response through handler with SQS event', () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = ( + event: SQSEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponse(event, sqsRecordHandler, processor); + }; + + // Act + const result = handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with Kinesis event', () => { + // Prepare + const records = [ + kinesisRecordFactory('success'), + kinesisRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.KinesisDataStreams); + const event: KinesisStreamEvent = { Records: records }; + + const handler = ( + event: KinesisStreamEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponse(event, kinesisRecordHandler, processor); + }; + + // Act + const result = handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler with DynamoDB event', () => { + // Prepare + const records = [ + dynamodbRecordFactory('success'), + dynamodbRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.DynamoDBStreams); + const event: DynamoDBStreamEvent = { Records: records }; + + const handler = ( + event: DynamoDBStreamEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponse(event, dynamodbRecordHandler, processor); + }; + + // Act + const result = handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + + test('Process partial response through handler for SQS records with incorrect event type', () => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + const event = dummyEvent; + const eventTypes: string = Object.values(EventType).toString(); + + const handler = ( + event: SQSEvent, + _context: Context + ): PartialItemFailureResponse => { + return processPartialResponse(event, sqsRecordHandler, processor); + }; + + // Act & Assess + expect(() => handler(event as unknown as SQSEvent, context)).toThrowError( + new Error( + 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + + eventTypes + + ' event.' + ) + ); + }); + + test('Process partial response through handler with context provided', () => { + // Prepare + const records = [ + sqsRecordFactory('success'), + sqsRecordFactory('success'), + ]; + const processor = new BatchProcessor(EventType.SQS); + const event: SQSEvent = { Records: records }; + + const handler = ( + event: SQSEvent, + context: Context + ): PartialItemFailureResponse => { + const options: BatchProcessingOptions = { context: context }; + + return processPartialResponse( + event, + handlerWithContext, + processor, + options + ); + }; + + // Act + const result = handler(event, context); + + // Assess + expect(result).toStrictEqual({ batchItemFailures: [] }); + }); + }); +}); diff --git a/packages/batch/tsconfig-dev.json b/packages/batch/tsconfig-dev.json new file mode 100644 index 0000000000..6f766859ea --- /dev/null +++ b/packages/batch/tsconfig-dev.json @@ -0,0 +1,11 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "declarationMap": true, + "esModuleInterop": false + }, + "include": [ "src/**/*", "examples/**/*", "**/tests/**/*" ], + "types": [ + "jest" + ] +} \ No newline at end of file diff --git a/packages/batch/tsconfig.es.json b/packages/batch/tsconfig.es.json new file mode 100644 index 0000000000..6f766859ea --- /dev/null +++ b/packages/batch/tsconfig.es.json @@ -0,0 +1,11 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "declarationMap": true, + "esModuleInterop": false + }, + "include": [ "src/**/*", "examples/**/*", "**/tests/**/*" ], + "types": [ + "jest" + ] +} \ No newline at end of file diff --git a/packages/batch/tsconfig.json b/packages/batch/tsconfig.json new file mode 100644 index 0000000000..09df4b9ba4 --- /dev/null +++ b/packages/batch/tsconfig.json @@ -0,0 +1,29 @@ +{ + "compilerOptions": { + "experimentalDecorators": true, + "noImplicitAny": true, + "target": "ES2020", + "module": "commonjs", + "declaration": true, + "outDir": "lib", + "strict": true, + "inlineSourceMap": true, + "moduleResolution": "node", + "resolveJsonModule": true, + "pretty": true, + "baseUrl": "src/", + "rootDirs": [ "src/" ], + "esModuleInterop": true + }, + "include": [ "src/**/*" ], + "exclude": [ "./node_modules"], + "watchOptions": { + "watchFile": "useFsEvents", + "watchDirectory": "useFsEvents", + "fallbackPolling": "dynamicPriority" + }, + "lib": [ "es2020" ], + "types": [ + "node" + ] +} \ No newline at end of file diff --git a/packages/batch/typedoc.json b/packages/batch/typedoc.json new file mode 100644 index 0000000000..ed0ca6fc47 --- /dev/null +++ b/packages/batch/typedoc.json @@ -0,0 +1,9 @@ +{ + "extends": [ + "../../typedoc.base.json" + ], + "entryPoints": [ + "./src/index.ts" + ], + "readme": "README.md" +} \ No newline at end of file