Skip to content

Commit

Permalink
feat: improve caching - adds TTL, cacheTransport and cacheGetKey (#739)
Browse files Browse the repository at this point in the history
* feat: add cacheTTL option
* feat: add `cacheGetKey` and `cacheTransport` options
* refactor: move default cache implementation to the separate file
* test: cache tests
---------

Signed-off-by: Ilya Amelevich <ilya.amelevich@gmail.com>
  • Loading branch information
iamelevich committed May 24, 2023
1 parent 0aecea7 commit dc9be53
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 59 deletions.
50 changes: 50 additions & 0 deletions lib/cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Simple in-memory cache implementation
* @class MemoryCache
* @property {Map} cache Cache map
*/
class MemoryCache {
constructor () {
this.cache = new Map();
}

/**
* Get cache value by key
* @param {string} key Cache key
* @return {any} Response from cache
*/
get (key) {
const cached = this.cache.get(key);
if (cached) {
if (cached.expiresAt > Date.now() || cached.expiresAt === 0) {
return cached.value;
}
this.cache.delete(key);
}
return undefined;
}

/**
* Set cache key with value and ttl
* @param {string} key Cache key
* @param {any} value Value to cache
* @param {number} ttl Time to live in milliseconds
* @return {void}
*/
set (key, value, ttl) {
this.cache.set(key, {
expiresAt: ttl,
value
});
}

/**
* Clear cache
* @returns {void}
*/
flush () {
this.cache.clear();
}
}

module.exports = exports = MemoryCache;
72 changes: 57 additions & 15 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const EventEmitter = require('events');
const Status = require('./status');
const Semaphore = require('./semaphore');
const MemoryCache = require('./cache');

const STATE = Symbol('state');
const OPEN = Symbol('open');
Expand All @@ -14,7 +15,6 @@ const FALLBACK_FUNCTION = Symbol('fallback');
const STATUS = Symbol('status');
const NAME = Symbol('name');
const GROUP = Symbol('group');
const CACHE = new WeakMap();
const ENABLED = Symbol('Enabled');
const WARMING_UP = Symbol('warming-up');
const VOLUME_THRESHOLD = Symbol('volume-threshold');
Expand Down Expand Up @@ -86,6 +86,15 @@ Please use options.errorThresholdPercentage`;
* has been cached that value will be returned for every subsequent execution:
* the cache can be cleared using `clearCache`. (The metrics `cacheHit` and
* `cacheMiss` reflect cache activity.) Default: false
* @param {Number} options.cacheTTL the time to live for the cache
* in milliseconds. Set 0 for infinity cache. Default: 0 (no TTL)
* @param {Function} options.cacheGetKey function that returns the key to use
* when caching the result of the circuit's fire.
* Better to use custom one, because `JSON.stringify` is not good
* from performance perspective.
* Default: `(...args) => JSON.stringify(args)`
* @param {CacheTransport} options.cacheTransport custom cache transport
* should implement `get`, `set` and `flush` methods.
* @param {AbortController} options.abortController this allows Opossum to
* signal upon timeout and properly abort your on going requests instead of
* leaving it in the background
Expand Down Expand Up @@ -146,6 +155,24 @@ class CircuitBreaker extends EventEmitter {
? options.capacity
: Number.MAX_SAFE_INTEGER;
this.options.errorFilter = options.errorFilter || (_ => false);
this.options.cacheTTL = options.cacheTTL ?? 0;
this.options.cacheGetKey = options.cacheGetKey ??
((...args) => JSON.stringify(args));

// Set default cache transport if not provided
if (this.options.cache) {
if (this.options.cacheTransport === undefined) {
this.options.cacheTransport = new MemoryCache();
} else if (typeof this.options.cacheTransport !== 'object' ||
!this.options.cacheTransport.get ||
!this.options.cacheTransport.set ||
!this.options.cacheTransport.flush
) {
throw new TypeError(
'options.cacheTransport should be an object with `get`, `set` and `flush` methods'
);
}
}

this.semaphore = new Semaphore(this.options.capacity);

Expand Down Expand Up @@ -275,9 +302,6 @@ class CircuitBreaker extends EventEmitter {
this.close();
}
});
if (this.options.cache) {
CACHE.set(this, undefined);
}

// Prepopulate the State of the Breaker
if (this[SHUTDOWN]) {
Expand Down Expand Up @@ -365,6 +389,9 @@ class CircuitBreaker extends EventEmitter {
}
this.status.shutdown();
this[STATE] = SHUTDOWN;

// clear cache on shutdown
this.clearCache();
}

/**
Expand Down Expand Up @@ -557,22 +584,29 @@ class CircuitBreaker extends EventEmitter {
}
const args = Array.prototype.slice.call(rest);

// Need to create variable here to prevent extra calls if cache is disabled
let cacheKey = '';

/**
* Emitted when the circuit breaker action is executed
* @event CircuitBreaker#fire
* @type {any} the arguments passed to the fired function
*/
this.emit('fire', args);

if (CACHE.get(this) !== undefined) {
/**
* Emitted when the circuit breaker is using the cache
* and finds a value.
* @event CircuitBreaker#cacheHit
*/
this.emit('cacheHit');
return CACHE.get(this);
} else if (this.options.cache) {
// If cache is enabled, check if we have a cached value
if (this.options.cache) {
cacheKey = this.options.cacheGetKey.apply(this, rest);
const cached = this.options.cacheTransport.get(cacheKey);
if (cached) {
/**
* Emitted when the circuit breaker is using the cache
* and finds a value.
* @event CircuitBreaker#cacheHit
*/
this.emit('cacheHit');
return cached;
}
/**
* Emitted when the circuit breaker does not find a value in
* the cache, but the cache option is enabled.
Expand Down Expand Up @@ -649,7 +683,13 @@ class CircuitBreaker extends EventEmitter {
this.semaphore.release();
resolve(result);
if (this.options.cache) {
CACHE.set(this, promise);
this.options.cacheTransport.set(
cacheKey,
promise,
this.options.cacheTTL > 0
? Date.now() + this.options.cacheTTL
: 0
);
}
}
})
Expand Down Expand Up @@ -686,7 +726,9 @@ class CircuitBreaker extends EventEmitter {
* @returns {void}
*/
clearCache () {
CACHE.set(this, undefined);
if (this.options.cache) {
this.options.cacheTransport.flush();
}
}

/**
Expand Down
170 changes: 170 additions & 0 deletions test/cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
'use strict';

const test = require('tape');
const CircuitBreaker = require('../');
const common = require('./common');

const passFail = common.passFail;

test('Using cache', t => {
t.plan(9);
const expected = 34;
const options = {
cache: true
};
const breaker = new CircuitBreaker(passFail, options);

breaker.fire(expected)
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 0, 'does not hit the cache');
t.equals(stats.cacheMisses, 1, 'emits a cacheMiss');
t.equals(stats.fires, 1, 'fired once');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 1, 'did not emit miss');
t.equals(stats.fires, 2, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
breaker.clearCache();
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.shutdown())
.then(t.end)
.catch(t.fail);
});

test('Using cache with TTL', t => {
t.plan(12);
const expected = 34;
const options = {
cache: true,
cacheTTL: 100
};
const breaker = new CircuitBreaker(passFail, options);

return breaker.fire(expected)
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 0, 'does not hit the cache');
t.equals(stats.cacheMisses, 1, 'emits a cacheMiss');
t.equals(stats.fires, 1, 'fired once');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 1, 'did not emit miss');
t.equals(stats.fires, 2, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
// wait 100ms for the cache to expire
.then(() => new Promise(resolve => setTimeout(resolve, 100)))
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 2, 'did not emit miss');
t.equals(stats.fires, 3, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(t.end)
.catch(t.fail);
});

test('Using cache with custom get cache key', t => {
t.plan(9);
const expected = 34;
const options = {
cache: true,
cacheGetKey: x => `key-${x}`
};
const breaker = new CircuitBreaker(passFail, options);

breaker.fire(expected)
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 0, 'does not hit the cache');
t.equals(stats.cacheMisses, 1, 'emits a cacheMiss');
t.equals(stats.fires, 1, 'fired once');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 1, 'did not emit miss');
t.equals(stats.fires, 2, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
breaker.clearCache();
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.shutdown())
.then(t.end)
.catch(t.fail);
});

test('Using cache with custom transport', t => {
t.plan(9);
const expected = 34;
const cache = new Map();
const options = {
cache: true,
cacheTransport: {
get: key => cache.get(key),
set: (key, value) => cache.set(key, value),
flush: () => cache.clear()
}
};
const breaker = new CircuitBreaker(passFail, options);

breaker.fire(expected)
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 0, 'does not hit the cache');
t.equals(stats.cacheMisses, 1, 'emits a cacheMiss');
t.equals(stats.fires, 1, 'fired once');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 1, 'did not emit miss');
t.equals(stats.fires, 2, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
breaker.clearCache();
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.shutdown())
.then(t.end)
.catch(t.fail);
});
Loading

0 comments on commit dc9be53

Please sign in to comment.