Skip to content

Commit

Permalink
feat(idempotency): add local cache to BasePersistenceLayer (#1396)
Browse files Browse the repository at this point in the history
  • Loading branch information
dreamorosi authored Apr 7, 2023
1 parent 31ae936 commit 2013ff2
Show file tree
Hide file tree
Showing 9 changed files with 673 additions and 36 deletions.
2 changes: 2 additions & 0 deletions packages/idempotency/src/IdempotencyConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class IdempotencyConfig {
public expiresAfterSeconds: number;
public hashFunction: string;
public lambdaContext?: Context;
public maxLocalCacheSize: number;
public payloadValidationJmesPath?: string;
public throwOnNoIdempotencyKey: boolean;
public useLocalCache: boolean;
Expand All @@ -16,6 +17,7 @@ class IdempotencyConfig {
this.throwOnNoIdempotencyKey = config.throwOnNoIdempotencyKey ?? false;
this.expiresAfterSeconds = config.expiresAfterSeconds ?? 3600; // 1 hour default
this.useLocalCache = config.useLocalCache ?? false;
this.maxLocalCacheSize = config.maxLocalCacheSize ?? 1000;
this.hashFunction = config.hashFunction ?? 'md5';
this.lambdaContext = config.lambdaContext;
}
Expand Down
82 changes: 68 additions & 14 deletions packages/idempotency/src/persistence/BasePersistenceLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import type {
import { EnvironmentVariablesService } from '../config';
import { IdempotencyRecord } from './IdempotencyRecord';
import { BasePersistenceLayerInterface } from './BasePersistenceLayerInterface';
import { IdempotencyValidationError } from '../Exceptions';
import { IdempotencyItemAlreadyExistsError, IdempotencyValidationError } from '../Exceptions';
import { LRUCache } from './LRUCache';

abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
public idempotencyKeyPrefix: string;
private cache?: LRUCache<string, IdempotencyRecord>;
private configured: boolean = false;
// envVarsService is always initialized in the constructor
private envVarsService!: EnvironmentVariablesService;
Expand All @@ -25,7 +27,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
private useLocalCache: boolean = false;
private validationKeyJmesPath?: string;

public constructor() {
public constructor() {
this.envVarsService = new EnvironmentVariablesService();
this.idempotencyKeyPrefix = this.getEnvVarsService().getFunctionName();
}
Expand Down Expand Up @@ -55,7 +57,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
this.throwOnNoIdempotencyKey = idempotencyConfig?.throwOnNoIdempotencyKey || false;
this.eventKeyJmesPath = idempotencyConfig.eventKeyJmesPath;
this.expiresAfterSeconds = idempotencyConfig.expiresAfterSeconds; // 1 hour default
// TODO: Add support for local cache
this.useLocalCache = idempotencyConfig.useLocalCache;
if (this.useLocalCache) {
this.cache = new LRUCache({ maxSize: idempotencyConfig.maxLocalCacheSize });
}
this.hashFunction = idempotencyConfig.hashFunction;
}

Expand All @@ -64,13 +69,15 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
*
* @param data - the data payload that will be hashed to create the hash portion of the idempotency key
*/
public async deleteRecord(data: Record<string, unknown>): Promise<void> {
const idempotencyRecord = new IdempotencyRecord({
public async deleteRecord(data: Record<string, unknown>): Promise<void> {
const idempotencyRecord = new IdempotencyRecord({
idempotencyKey: this.getHashedIdempotencyKey(data),
status: IdempotencyRecordStatus.EXPIRED
});

await this._deleteRecord(idempotencyRecord);

this.deleteFromCache(idempotencyRecord.idempotencyKey);
}

/**
Expand All @@ -81,7 +88,15 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
public async getRecord(data: Record<string, unknown>): Promise<IdempotencyRecord> {
const idempotencyKey = this.getHashedIdempotencyKey(data);

const cachedRecord = this.getFromCache(idempotencyKey);
if (cachedRecord) {
this.validatePayload(data, cachedRecord);

return cachedRecord;
}

const record = await this._getRecord(idempotencyKey);
this.saveToCache(record);
this.validatePayload(data, record);

return record;
Expand All @@ -97,7 +112,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
* @param data - the data payload that will be hashed to create the hash portion of the idempotency key
* @param remainingTimeInMillis - the remaining time left in the lambda execution context
*/
public async saveInProgress(data: Record<string, unknown>, remainingTimeInMillis?: number): Promise<void> {
public async saveInProgress(data: Record<string, unknown>, remainingTimeInMillis?: number): Promise<void> {
const idempotencyRecord = new IdempotencyRecord({
idempotencyKey: this.getHashedIdempotencyKey(data),
status: IdempotencyRecordStatus.INPROGRESS,
Expand All @@ -113,6 +128,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
);
}

if (this.getFromCache(idempotencyRecord.idempotencyKey)) {
throw new IdempotencyItemAlreadyExistsError();
}

await this._putRecord(idempotencyRecord);
}

Expand All @@ -123,7 +142,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
* @param data - the data payload that will be hashed to create the hash portion of the idempotency key
* @param result - the result of the successfully completed function
*/
public async saveSuccess(data: Record<string, unknown>, result: Record<string, unknown>): Promise<void> {
public async saveSuccess(data: Record<string, unknown>, result: Record<string, unknown>): Promise<void> {
const idempotencyRecord = new IdempotencyRecord({
idempotencyKey: this.getHashedIdempotencyKey(data),
status: IdempotencyRecordStatus.COMPLETED,
Expand All @@ -133,23 +152,33 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
});

await this._updateRecord(idempotencyRecord);

this.saveToCache(idempotencyRecord);
}

protected abstract _deleteRecord(record: IdempotencyRecord): Promise<void>;
protected abstract _getRecord(idempotencyKey: string): Promise<IdempotencyRecord>;
protected abstract _putRecord(record: IdempotencyRecord): Promise<void>;
protected abstract _updateRecord(record: IdempotencyRecord): Promise<void>;

private deleteFromCache(idempotencyKey: string): void {
if (!this.useLocalCache) return;
// Delete from local cache if it exists
if (this.cache?.has(idempotencyKey)) {
this.cache?.remove(idempotencyKey);
}
}

/**
* Generates a hash of the data and returns the digest of that hash
*
* @param data the data payload that will generate the hash
* @returns the digest of the generated hash
*/
private generateHash(data: string): string{
private generateHash(data: string): string {
const hash: Hash = createHash(this.hashFunction);
hash.update(data);

return hash.digest('base64');
}

Expand All @@ -168,10 +197,21 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
*/
private getExpiryTimestamp(): number {
const currentTime: number = Date.now() / 1000;

return currentTime + this.expiresAfterSeconds;
}

private getFromCache(idempotencyKey: string): IdempotencyRecord | undefined {
if (!this.useLocalCache) return undefined;
const cachedRecord = this.cache?.get(idempotencyKey);
if (cachedRecord) {
// if record is not expired, return it
if (!cachedRecord.isExpired()) return cachedRecord;
// if record is expired, delete it from cache
this.deleteFromCache(idempotencyKey);
}
}

/**
* Generates the idempotency key used to identify records in the persistence store.
*
Expand All @@ -182,14 +222,14 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
if (this.eventKeyJmesPath) {
data = search(data, this.eventKeyJmesPath);
}

if (BasePersistenceLayer.isMissingIdempotencyKey(data)) {
if (this.throwOnNoIdempotencyKey) {
throw new Error('No data found to create a hashed idempotency_key');
}
console.warn(`No value found for idempotency_key. jmespath: ${this.eventKeyJmesPath}`);
}

return `${this.idempotencyKeyPrefix}#${this.generateHash(JSON.stringify(data))}`;
}

Expand All @@ -204,7 +244,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
// Therefore, the assertion is safe.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
data = search(data, this.validationKeyJmesPath!);

return this.generateHash(JSON.stringify(data));
}

Expand All @@ -223,6 +263,20 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
return !data;
}

/**
* Save record to local cache except for when status is `INPROGRESS`.
*
* We can't cache `INPROGRESS` records because we have no way to reflect updates
* that might happen to the record outside of the execution context of the function.
*
* @param record - record to save
*/
private saveToCache(record: IdempotencyRecord): void {
if (!this.useLocalCache) return;
if (record.getStatus() === IdempotencyRecordStatus.INPROGRESS) return;
this.cache?.add(record.idempotencyKey, record);
}

private validatePayload(data: Record<string, unknown>, record: IdempotencyRecord): void {
if (this.payloadValidationEnabled) {
const hashedPayload: string = this.getHashedPayload(data);
Expand Down
4 changes: 2 additions & 2 deletions packages/idempotency/src/persistence/IdempotencyRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class IdempotencyRecord {
public responseData?: Record<string, unknown>;
private status: IdempotencyRecordStatus;

public constructor(config: IdempotencyRecordOptions) {
public constructor(config: IdempotencyRecordOptions) {
this.idempotencyKey = config.idempotencyKey;
this.expiryTimestamp = config.expiryTimestamp;
this.inProgressExpiryTimestamp = config.inProgressExpiryTimestamp;
Expand All @@ -38,7 +38,7 @@ class IdempotencyRecord {
}
}

private isExpired(): boolean {
public isExpired(): boolean {
return this.expiryTimestamp !== undefined && ((Date.now() / 1000) > this.expiryTimestamp);
}
}
Expand Down
Loading

0 comments on commit 2013ff2

Please sign in to comment.