Skip to content

Commit

Permalink
feat(batch): add option to continue processing other group IDs on fai…
Browse files Browse the repository at this point in the history
…lure in `SqsFifoPartialProcessor` (#2590)
  • Loading branch information
arnabrahman authored Jun 4, 2024
1 parent 9def9fa commit a615c24
Show file tree
Hide file tree
Showing 9 changed files with 414 additions and 26 deletions.
50 changes: 43 additions & 7 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,25 @@ Processing batches from SQS works in three stages:

#### FIFO queues

When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
This helps preserve the ordering of messages in your queue.
When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queues.html){target="_blank"}, a batch may include messages from different group IDs.

```typescript hl_lines="1-4 8 20-22"
--8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts"
```
By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.

Enable the `skipGroupOnError` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.

1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)
=== "Recommended"

```typescript hl_lines="1-4 8"
--8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts"
```

1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)

=== "Enabling skipGroupOnError flag"

```typescript hl_lines="1-4 13 30"
--8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts"
```

!!! Note
Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`.
Expand Down Expand Up @@ -283,7 +294,7 @@ sequenceDiagram

> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues.
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues without `skipGroupOnError` flag.

<center>
```mermaid
Expand All @@ -307,6 +318,31 @@ sequenceDiagram
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>

Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues with `skipGroupOnError` flag.

<center>
```mermaid
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function-->Lambda function: Process messages from another MessageGroupID
Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
deactivate Lambda function
activate SQS queue
Lambda service->>SQS queue: Delete successful messages processed
SQS queue-->>SQS queue: Failed messages return
deactivate SQS queue
```
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>

#### Kinesis and DynamoDB Streams

> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.
Expand Down
32 changes: 32 additions & 0 deletions examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import {
SqsFifoPartialProcessor,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type {
SQSEvent,
SQSRecord,
Context,
SQSBatchResponse,
} from 'aws-lambda';

const processor = new SqsFifoPartialProcessor();
const logger = new Logger();

const recordHandler = (record: SQSRecord): void => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
logger.info('Processed item', { item });
}
};

export const handler = async (
event: SQSEvent,
context: Context
): Promise<SQSBatchResponse> => {
return processPartialResponseSync(event, recordHandler, processor, {
context,
skipGroupOnError: true,
});
};
113 changes: 101 additions & 12 deletions packages/batch/src/SqsFifoPartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import { SQSRecord } from 'aws-lambda';
import { BatchProcessorSync } from './BatchProcessorSync.js';
import { EventType } from './constants.js';
import { SqsFifoShortCircuitError } from './errors.js';
import type { FailureResponse, SuccessResponse } from './types.js';
import {
BatchProcessingError,
SqsFifoMessageGroupShortCircuitError,
SqsFifoShortCircuitError,
} from './errors.js';
import type {
BaseRecord,
EventSourceDataClassTypes,
FailureResponse,
SuccessResponse,
} from './types.js';

/**
* Batch processor for SQS FIFO queues
Expand Down Expand Up @@ -35,8 +45,36 @@ import type { FailureResponse, SuccessResponse } from './types.js';
* ```
*/
class SqsFifoPartialProcessor extends BatchProcessorSync {
/**
* The ID of the current message group being processed.
*/
#currentGroupId?: string;
/**
* A set of group IDs that have already encountered failures.
*/
#failedGroupIds: Set<string>;

public constructor() {
super(EventType.SQS);
this.#failedGroupIds = new Set<string>();
}

/**
* Handles a failure for a given record.
* Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true.
* @param record - The record that failed.
* @param exception - The error that occurred.
* @returns The failure response.
*/
public failureHandler(
record: EventSourceDataClassTypes,
exception: Error
): FailureResponse {
if (this.options?.skipGroupOnError && this.#currentGroupId) {
this.#addToFailedGroup(this.#currentGroupId);
}

return super.failureHandler(record, exception);
}

/**
Expand All @@ -48,8 +86,11 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
* The method calls the prepare hook to initialize the processor and then
* iterates over each record in the batch, processing them one by one.
*
* If one of them fails, the method short circuits the processing and fails
* the remaining records in the batch.
* If one of them fails and `skipGroupOnError` is not true, the method short circuits
* the processing and fails the remaining records in the batch.
*
* If one of them fails and `skipGroupOnError` is true, then the method fails the current record
* if the message group has any previous failure, otherwise keeps processing.
*
* Then, it calls the clean hook to clean up the processor and returns the
* processed records.
Expand All @@ -60,13 +101,31 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
let currentIndex = 0;
for (const record of this.records) {
// If we have any failed messages, it means the last message failed
// We should then short circuit the process and fail remaining messages
if (this.failureMessages.length != 0) {
this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId);

// If we have any failed messages, we should then short circuit the process and
// fail remaining messages unless `skipGroupOnError` is true
const shouldShortCircuit =
!this.options?.skipGroupOnError && this.failureMessages.length !== 0;
if (shouldShortCircuit) {
return this.shortCircuitProcessing(currentIndex, processedRecords);
}

processedRecords.push(this.processRecordSync(record));
// If `skipGroupOnError` is true and the current group has previously failed,
// then we should skip processing the current group.
const shouldSkipCurrentGroup =
this.options?.skipGroupOnError &&
this.#currentGroupId &&
this.#failedGroupIds.has(this.#currentGroupId);

const result = shouldSkipCurrentGroup
? this.#processFailRecord(
record,
new SqsFifoMessageGroupShortCircuitError()
)
: this.processRecordSync(record);

processedRecords.push(result);
currentIndex++;
}

Expand Down Expand Up @@ -94,16 +153,46 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
const remainingRecords = this.records.slice(firstFailureIndex);

for (const record of remainingRecords) {
const data = this.toBatchType(record, this.eventType);
processedRecords.push(
this.failureHandler(data, new SqsFifoShortCircuitError())
);
this.#processFailRecord(record, new SqsFifoShortCircuitError());
}

this.clean();

return processedRecords;
}

/**
* Adds the specified group ID to the set of failed group IDs.
*
* @param group - The group ID to be added to the set of failed group IDs.
*/
#addToFailedGroup(group: string): void {
this.#failedGroupIds.add(group);
}

/**
* Processes a fail record.
*
* @param record - The record that failed.
* @param exception - The error that occurred.
*/
#processFailRecord(
record: BaseRecord,
exception: BatchProcessingError
): FailureResponse {
const data = this.toBatchType(record, this.eventType);

return this.failureHandler(data, exception);
}

/**
* Sets the current group ID for the message being processed.
*
* @param group - The group ID of the current message being processed.
*/
#setCurrentGroup(group?: string): void {
this.#currentGroupId = group;
}
}

export { SqsFifoPartialProcessor };
12 changes: 12 additions & 0 deletions packages/batch/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ class SqsFifoShortCircuitError extends BatchProcessingError {
}
}

/**
* Error thrown by the Batch Processing utility when a previous record from
* SQS FIFO queue message group fails processing.
*/
class SqsFifoMessageGroupShortCircuitError extends BatchProcessingError {
public constructor() {
super('A previous record from this message group failed processing');
this.name = 'SqsFifoMessageGroupShortCircuitError';
}
}

/**
* Error thrown by the Batch Processing utility when a partial processor receives an unexpected
* batch type.
Expand All @@ -56,5 +67,6 @@ export {
BatchProcessingError,
FullBatchFailureError,
SqsFifoShortCircuitError,
SqsFifoMessageGroupShortCircuitError,
UnexpectedBatchTypeError,
};
1 change: 1 addition & 0 deletions packages/batch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export {
BatchProcessingError,
FullBatchFailureError,
SqsFifoShortCircuitError,
SqsFifoMessageGroupShortCircuitError,
UnexpectedBatchTypeError,
} from './errors.js';
export { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
Expand Down
34 changes: 30 additions & 4 deletions packages/batch/src/processPartialResponseSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,42 @@ import type {
* });
* ```
*
* When working with SQS FIFO queues, we will stop processing at the first failure
* and mark unprocessed messages as failed to preserve ordering. However, if you want to
* continue processing messages from different group IDs, you can enable the `skipGroupOnError`
* option for seamless processing of messages from various group IDs.
*
* @example
* ```typescript
* import {
* SqsFifoPartialProcessor,
* processPartialResponseSync,
* } from '@aws-lambda-powertools/batch';
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
*
* const processor = new SqsFifoPartialProcessor();
*
* const recordHandler = async (record: SQSRecord): Promise<void> => {
* const payload = JSON.parse(record.body);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponseSync(event, recordHandler, processor, {
* context,
* skipGroupOnError: true
* });
* ```
*
* @param event The event object containing the batch of records
* @param recordHandler Sync function to process each record from the batch
* @param processor Batch processor instance to handle the batch processing
* @param options Batch processing options
* @param options Batch processing options, which can vary with chosen batch processor implementation
*/
const processPartialResponseSync = (
const processPartialResponseSync = <T extends BasePartialBatchProcessor>(
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
processor: T,
options?: BatchProcessingOptions<T>
): PartialItemFailureResponse => {
if (!event.Records || !Array.isArray(event.Records)) {
throw new UnexpectedBatchTypeError();
Expand Down
13 changes: 11 additions & 2 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@ import type {
KinesisStreamRecord,
SQSRecord,
} from 'aws-lambda';
import { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';

/**
* Options for batch processing
*
* @template T The type of the batch processor, defaults to BasePartialBatchProcessor
* @property context The context object provided by the AWS Lambda runtime
* @property skipGroupOnError The option to group on error during processing
*/
type BatchProcessingOptions = {
type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
/**
* The context object provided by the AWS Lambda runtime. When provided,
* it's made available to the handler function you specify
*/
context: Context;
context?: Context;
/**
* This option is only available for SqsFifoPartialProcessor.
* If true skip the group on error during processing.
*/
skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never;
};

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/batch/tests/helpers/factories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type {
} from 'aws-lambda';
import { randomInt, randomUUID } from 'node:crypto';

const sqsRecordFactory = (body: string): SQSRecord => {
const sqsRecordFactory = (body: string, messageGroupId?: string): SQSRecord => {
return {
messageId: randomUUID(),
receiptHandle: 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a',
Expand All @@ -15,6 +15,7 @@ const sqsRecordFactory = (body: string): SQSRecord => {
SentTimestamp: '1545082649183',
SenderId: 'AIDAIENQZJOLO23YVJ4VO',
ApproximateFirstReceiveTimestamp: '1545082649185',
...(messageGroupId ? { MessageGroupId: messageGroupId } : {}),
},
messageAttributes: {},
md5OfBody: 'e4e68fb7bd0e697a0ae8f1bb342846b3',
Expand Down
Loading

0 comments on commit a615c24

Please sign in to comment.