Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors code to support inheritance #198

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/server/queue/bee.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
const Queue = require('./queue');
const Job = require('./job');
const JobData = require('./jobData');

class BeeJob extends Job {
constructor(job) {
super(job);
}

anuragagarwal561994 marked this conversation as resolved.
Show resolved Hide resolved
async remove() {
await this._job.remove();
}

async getStatus() {
return Promise.resolve(this._job.status);
anuragagarwal561994 marked this conversation as resolved.
Show resolved Hide resolved
}

async toJSON() {
const {id, progress, data, options: {timestamp, stacktraces: stacktrace, delay}} = this._job;
return new JobData({id, progress, data, timestamp, stacktrace, delay});
}
}

const VALID_STATES = ['waiting', 'active', 'succeeded', 'failed', 'delayed'];
const SUPPORTED_ACTIONS = ['remove'];

module.exports = class BeeQueue extends Queue {
constructor(queueConfig) {
const {name} = queueConfig;
const options = BeeQueue.parseConfig(queueConfig);
const queue = new BeeQueue(name, options);
super(queue);
}

static parseConfig(queueConfig) {
const options = {
redis: this.parseRedisConfig(queueConfig),
isWorker: false,
getEvents: false,
sendEvents: false,
storeJobs: false,
};
const {prefix} = queueConfig;
if (prefix) options.prefix = prefix;
return options;
}

async getJob(id) {
const job = this._queue.getJob(id);
return new BeeJob(job);
}

async getJobCounts() {
const jobCounts = this._queue.checkHealth();
delete jobCounts.newestJob;
return jobCounts;
}

async getJobs(state, start, size) {
const page = {};

if (['failed', 'succeeded'].includes(state)) {
page.size = size;
} else {
page.start = start;
page.end = start + size - 1;
}

let jobs = await this._queue.getJobs(state, page);
// Filter out Bee jobs that have already been removed by the time the promise resolves
jobs = jobs.filter((job) => job);
return jobs.map((j) => new BeeJob(j));
}

async addJob(data) {
const job = await this._queue.createJob(data).save();
return new BeeJob(job);
}

isValidState(state) {
return VALID_STATES.includes(state);
}

isActionSupported(action) {
return SUPPORTED_ACTIONS.includes(action);
}

isPaginationSupported(state) {
return state !== 'succeeded' && state !== 'failed';
}
};
101 changes: 101 additions & 0 deletions src/server/queue/bull.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
const {capitalize} = require('lodash');
const Bull = require('bull');
const Queue = require('./queue');
const Job = require('./job');
const JobData = require('./jobData');

const VALID_STATES = ['waiting', 'active', 'completed', 'failed', 'delayed'];
const SUPPORTED_ACTIONS = ['remove', 'retry'];

class BullJob extends Job {
constructor(job) {
super(job);
}

async remove() {
await this._job.remove();
}

async retry() {
await this._job.retry();
}

async getStatus() {
return this._job.getState();
}


async toJSON() {
const {
id,
name,
data,
attemptsMade,
failedReason,
stacktrace,
returnvalue: returnValue,
timestamp,
delay,
progress
} = this._job.toJSON();
return new JobData({
id,
name,
data,
attemptsMade,
failedReason,
stacktrace,
timestamp,
delay,
progress,
returnValue,
});
}
}

module.exports = class BullQueue extends Queue {
constructor(queueConfig) {
const {name} = queueConfig;
const options = BullQueue.parseConfig(queueConfig);
const queue = Bull(name, options);
super(queue);
}

static parseConfig(queueConfig) {
const options = {redis: this.parseRedisConfig(queueConfig)};
const {createClient, prefix} = queueConfig;
if (createClient) options.createClient = createClient;
if (prefix) options.prefix = prefix;
return options;
}

async getJob(id) {
const job = await this._queue.getJob(id);
return new BullJob(job);
}

async getJobCounts() {
return this._queue.getJobCounts();
}

async getJobs(state, start, size) {
const jobs = await this._queue[`get${capitalize(state)}`](start, start + size - 1);
return jobs.map((j) => new BullJob(j));
}

async addJob(data) {
const job = await this._queue.add(data, {
removeOnComplete: false,
removeOnFail: false
});
return new BullJob(job);
}

isValidState(state) {
return VALID_STATES.includes(state);
}

isActionSupported(action) {
return SUPPORTED_ACTIONS.includes(action);
}
};
52 changes: 6 additions & 46 deletions src/server/queue/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const _ = require('lodash');
const Bull = require('bull');
const Bee = require('bee-queue');
const BullQueue = require('./bull');
const BeeQueue = require('./bee');

class Queues {
constructor(config) {
Expand Down Expand Up @@ -38,59 +38,19 @@ class Queues {
return this._queues[queueHost][queueName];
}

const { type, name, port, host, db, password, prefix, url, redis, tls } = queueConfig;

const redisHost = { host };
if (password) redisHost.password = password;
if (port) redisHost.port = port;
if (db) redisHost.db = db;
if (tls) redisHost.tls = tls;

const isBee = type === 'bee';

const options = {
redis: redis || url || redisHost
};
if (prefix) options.prefix = prefix;

const {type} = queueConfig;
let queue;
if (isBee) {
_.extend(options, {
isWorker: false,
getEvents: false,
sendEvents: false,
storeJobs: false
});

queue = new Bee(name, options);
queue.IS_BEE = true;
if (type === 'bee') {
queue = new BeeQueue(queueConfig);
} else {
if (queueConfig.createClient) options.createClient = queueConfig.createClient;
queue = new Bull(name, options);
queue = new BullQueue(queueConfig);
}

this._queues[queueHost] = this._queues[queueHost] || {};
this._queues[queueHost][queueName] = queue;

return queue;
}

/**
* Creates and adds a job with the given `data` to the given `queue`.
*
* @param {Object} queue A bee or bull queue class
* @param {Object} data The data to be used within the job
*/
async set(queue, data) {
if (queue.IS_BEE) {
return queue.createJob(data).save();
} else {
return queue.add(data, {
removeOnComplete: false,
removeOnFail: false
});
}
}
}

module.exports = Queues;
17 changes: 17 additions & 0 deletions src/server/queue/job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module.exports = class Job {
constructor(job) {
this._job = job;
if (new.target === Job) {
throw new TypeError("Cannot construct Job instances directly");
}
}

async remove() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have these abstract methods throw a not implemented exception? This would both make it clear that these should be overridden, and avoid weird cases down the road where we instantiate a Job but avoid the constructor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, just check once. Didn't use a message because the stacktrace should already cover the method name from where the error originated.

}

async getStatus() {
}

async toJSON() {
}
};
16 changes: 16 additions & 0 deletions src/server/queue/jobData.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module.exports = class JobData {
constructor({id, name, data, stacktrace, timestamp, progress, delay, attemptsMade, returnValue, failedReason}) {
this.id = id;
this.name = name;
this.data = data;
this.progress = progress;
this.attemptsMade = attemptsMade;
this.returnValue = returnValue;
this.failedReason = failedReason;
this.options = {
stacktrace,
timestamp,
delay,
};
}
};
43 changes: 43 additions & 0 deletions src/server/queue/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
module.exports = class Queue {
constructor(queue) {
this._queue = queue;
if (new.target === Queue) {
throw new TypeError("Cannot construct Queue instances directly");
}
}

static parseRedisConfig({port, host, db, password, url, redis, tls}) {
const redisHost = {host};
if (password) redisHost.password = password;
if (port) redisHost.port = port;
if (db) redisHost.db = db;
if (tls) redisHost.tls = tls;
return redis || url || redisHost;
}

get redisClient() {
return this._queue.client;
}

async getJob(id) {
}

async getJobCounts() {
}

async getJobs(state, start, size) {
}

async addJob(data, options) {
}

isValidState(state) {
}

isActionSupported(action) {
}

isPaginationSupported(_state) {
return true;
}
};
16 changes: 7 additions & 9 deletions src/server/views/api/bulkAction.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
const _ = require('lodash');

const ACTIONS = ['remove', 'retry'];

function bulkAction(action) {
return async function handler(req, res) {
if (!_.includes(ACTIONS, action)) {
res.status(401).send({
error: 'unauthorized action',
details: `action ${action} not permitted`
});
}

const { queueName, queueHost } = req.params;
const {Queues} = req.app.locals;
const queue = await Queues.get(queueName, queueHost);
if (!queue) return res.status(404).send({error: 'queue not found'});
anuragagarwal561994 marked this conversation as resolved.
Show resolved Hide resolved

if (!queue.isActionSupported(action)) {
return res.status(401).send({
anuragagarwal561994 marked this conversation as resolved.
Show resolved Hide resolved
error: 'unauthorized action',
details: `queue does not support action ${action}`
});
}

const {jobs} = req.body;

try {
Expand Down
24 changes: 1 addition & 23 deletions src/server/views/api/jobRemove.js
Original file line number Diff line number Diff line change
@@ -1,23 +1 @@
async function handler(req, res) {
const { queueName, queueHost, id } = req.params;

const {Queues} = req.app.locals;
const queue = await Queues.get(queueName, queueHost);
if (!queue) return res.status(404).send({error: 'queue not found'});

const job = await queue.getJob(id);
if (!job) return res.status(404).send({error: 'job not found'});

try {
await job.remove();
return res.sendStatus(200);
} catch (e) {
const body = {
error: 'queue error',
details: e.stack
};
return res.status(500).send(body);
}
}

module.exports = handler;
module.exports = require('./performAction')('remove');
Loading