Skip to content

Commit

Permalink
feat(batch): Implementation of base batch processing classes (#1588)
Browse files Browse the repository at this point in the history
* chore: init workspace

* chore: init workspace

* Initial base class implementation

* Added BatchProcessor implementation, attempted fix for async

* Added unit tests

* Refactoring unit tests

* Lint fix, updated docstrings

* Added response and identifier typings

* test(idempotency): improve integration tests for utility (#1591)

* docs: new name

* chore: rename e2e files

* tests(idempotency): expand integration tests

* chore(idempotency): remove unreachable code

* Removed unnecessary type casting

* Moved exports for handlers and factories

* Updated imports, refactored randomization in factories

* Refactored EventType to be const instead of enum

* Refactored and added documentation for errors

* Removed debugging line

* chore(ci): add canary to layer deployment (#1593)

* docs(idempotency): write utility docs (#1592)

* docs: base docs

* wip

* chore: added paths to snippets tsconfig

* chore: added page to docs menu

* docs(idempotency): utility docs

* highlights

* chore: remove CDK mention

* build(internal): bump semver from 5.7.1 to 5.7.2 (#1594)

Bumps [semver](https://github.com/npm/node-semver) from 5.7.1 to 5.7.2.
- [Release notes](https://github.com/npm/node-semver/releases)
- [Changelog](https://github.com/npm/node-semver/blob/v5.7.2/CHANGELOG.md)
- [Commits](npm/node-semver@v5.7.1...v5.7.2)

---
updated-dependencies:
- dependency-name: semver
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(idempotency): mark the utility ready public beta (#1595)

* chore(idempotency): mark utility as public beta

* chore: manually increment version in commons

* docs(internal): update AWS SDK links to new docs (#1597)

* chore(maintenance): remove parameters utility from layer bundling and layers e2e tests (#1599)

* remove parameter from e2e tests

* remove parameters from canary stack as well

* chore(release): v1.11.1 [skip ci]

* fix canary deploy in ci with correct workspace name (#1601)

* chore: update layer ARN on documentation

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Andrea Amorosi <dreamorosi@gmail.com>
Co-authored-by: Alexander Schueren <amelnyk@amazon.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Release bot[bot] <aws-devax-open-source@amazon.com>
  • Loading branch information
6 people committed Jul 15, 2023
1 parent 1c8c4aa commit 76bd7b8
Show file tree
Hide file tree
Showing 11 changed files with 969 additions and 8 deletions.
6 changes: 5 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

161 changes: 161 additions & 0 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Process batch and partially report failed items
*/
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import {
BasePartialProcessor,
BatchProcessingError,
DATA_CLASS_MAPPING,
DEFAULT_RESPONSE,
EventSourceDataClassTypes,
EventType,
ItemIdentifier,
BatchResponse,
} from '.';

abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public COLLECTOR_MAPPING;

public batchResponse: BatchResponse;

public eventType: keyof typeof EventType;

/**
* Initializes base batch processing class
* @param eventType Whether this is SQS, DynamoDB stream, or Kinesis data stream event
*/
public constructor(eventType: keyof typeof EventType) {
super();
this.eventType = eventType;
this.batchResponse = DEFAULT_RESPONSE;
this.COLLECTOR_MAPPING = {
[EventType.SQS]: () => this.collectSqsFailures(),
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(),
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(),
};
}

/**
* Report messages to be deleted in case of partial failures
*/
public clean(): void {
if (!this.hasMessagesToReport()) {
return;
}

if (this.entireBatchFailed()) {
throw new BatchProcessingError(
'All records failed processing. ' +
this.exceptions.length +
' individual errors logged separately below.',
this.exceptions
);
}

const messages: ItemIdentifier[] = this.getMessagesToReport();
this.batchResponse = { batchItemFailures: messages };
}

/**
* Collects identifiers of failed items for a DynamoDB stream
* @returns list of identifiers for failed items
*/
public collectDynamoDBFailures(): ItemIdentifier[] {
const failures: ItemIdentifier[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as DynamoDBRecord).dynamodb?.SequenceNumber;
if (msgId) {
failures.push({ itemIdentifier: msgId });
}
}

return failures;
}

/**
* Collects identifiers of failed items for a Kinesis stream
* @returns list of identifiers for failed items
*/
public collectKinesisFailures(): ItemIdentifier[] {
const failures: ItemIdentifier[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as KinesisStreamRecord).kinesis.sequenceNumber;
failures.push({ itemIdentifier: msgId });
}

return failures;
}

/**
* Collects identifiers of failed items for an SQS batch
* @returns list of identifiers for failed items
*/
public collectSqsFailures(): ItemIdentifier[] {
const failures: ItemIdentifier[] = [];

for (const msg of this.failureMessages) {
const msgId = (msg as SQSRecord).messageId;
failures.push({ itemIdentifier: msgId });
}

return failures;
}

/**
* Determines whether all records in a batch failed to process
* @returns true if all records resulted in exception results
*/
public entireBatchFailed(): boolean {
return this.exceptions.length == this.records.length;
}

/**
* Collects identifiers for failed batch items
* @returns formatted messages to use in batch deletion
*/
public getMessagesToReport(): ItemIdentifier[] {
return this.COLLECTOR_MAPPING[this.eventType]();
}

/**
* Determines if any records failed to process
* @returns true if any records resulted in exception
*/
public hasMessagesToReport(): boolean {
if (this.failureMessages.length != 0) {
return true;
}

// console.debug('All ' + this.successMessages.length + ' records successfully processed');

return false;
}

/**
* Remove results from previous execution
*/
public prepare(): void {
this.successMessages.length = 0;
this.failureMessages.length = 0;
this.exceptions.length = 0;
this.batchResponse = DEFAULT_RESPONSE;
}

/**
* @returns Batch items that failed processing, if any
*/
public response(): BatchResponse {
return this.batchResponse;
}

public toBatchType(
record: EventSourceDataClassTypes,
eventType: keyof typeof EventType
): SQSRecord | KinesisStreamRecord | DynamoDBRecord {
return DATA_CLASS_MAPPING[eventType](record);
}
}

export { BasePartialBatchProcessor };
120 changes: 120 additions & 0 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Abstract class for batch processors
*/
import {
BaseRecord,
EventSourceDataClassTypes,
FailureResponse,
ResultType,
SuccessResponse,
} from '.';

abstract class BasePartialProcessor {
public exceptions: Error[];

public failureMessages: EventSourceDataClassTypes[];

public handler: CallableFunction;

public records: BaseRecord[];

public successMessages: EventSourceDataClassTypes[];

/**
* Initializes base processor class
*/
public constructor() {
this.successMessages = [];
this.failureMessages = [];
this.exceptions = [];
this.records = [];
this.handler = new Function();
}

/**
* Clean class instance after processing
*/
public abstract clean(): void;

/**
* Keeps track of batch records that failed processing
* @param record record that failed processing
* @param exception exception that was thrown
* @returns FailureResponse object with ["fail", exception, original record]
*/
public failureHandler(
record: EventSourceDataClassTypes,
exception: Error
): FailureResponse {
const entry: FailureResponse = ['fail', exception.message, record];
// console.debug('Record processing exception: ' + exception.message);
this.exceptions.push(exception);
this.failureMessages.push(record);

return entry;
}

/**
* Prepare class instance before processing
*/
public abstract prepare(): void;

/**
* Call instance's handler for each record
* @returns List of processed records
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
this.prepare();

const processedRecords: (SuccessResponse | FailureResponse)[] = [];
for (const record of this.records) {
processedRecords.push(await this.processRecord(record));
}

this.clean();

return processedRecords;
}

/**
* Process a record with the handler
* @param record Record to be processed
*/
public abstract processRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse>;

/**
* Set class instance attributes before execution
* @param records List of records to be processed
* @param handler CallableFunction to process entries of "records"
* @returns this object
*/
public register(
records: BaseRecord[],
handler: CallableFunction
): BasePartialProcessor {
this.records = records;
this.handler = handler;

return this;
}

/**
* Keeps track of batch records that were processed successfully
* @param record record that succeeded processing
* @param result result from record handler
* @returns SuccessResponse object with ["success", result, original record]
*/
public successHandler(
record: EventSourceDataClassTypes,
result: ResultType
): SuccessResponse {
const entry: SuccessResponse = ['success', result, record];
this.successMessages.push(record);

return entry;
}
}

export { BasePartialProcessor };
30 changes: 29 additions & 1 deletion packages/batch/src/BatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
class BatchProcessor {}
/**
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB
*/
import {
BasePartialBatchProcessor,
BaseRecord,
FailureResponse,
SuccessResponse,
} from '.';

class BatchProcessor extends BasePartialBatchProcessor {
/**
* Process a record with instance's handler
* @param record Batch record to be processed
* @returns response of success or failure
*/
public async processRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
try {
const data = this.toBatchType(record, this.eventType);
const result = await this.handler(data);

return this.successHandler(record, result);
} catch (e) {
return this.failureHandler(record, e as Error);
}
}
}

export { BatchProcessor };
25 changes: 25 additions & 0 deletions packages/batch/src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Constants for batch processor classes
*/
import { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda';
import type { BatchResponse, EventSourceDataClassTypes } from '.';

const EventType = {
SQS: 'SQS',
KinesisDataStreams: 'KinesisDataStreams',
DynamoDBStreams: 'DynamoDBStreams',
} as const;

const DEFAULT_RESPONSE: BatchResponse = {
batchItemFailures: [],
};

const DATA_CLASS_MAPPING = {
[EventType.SQS]: (record: EventSourceDataClassTypes) => record as SQSRecord,
[EventType.KinesisDataStreams]: (record: EventSourceDataClassTypes) =>
record as KinesisStreamRecord,
[EventType.DynamoDBStreams]: (record: EventSourceDataClassTypes) =>
record as DynamoDBRecord,
};

export { EventType, DEFAULT_RESPONSE, DATA_CLASS_MAPPING };
Loading

0 comments on commit 76bd7b8

Please sign in to comment.