Skip to content

Commit

Permalink
Clear rpc timeout (#61)
Browse files Browse the repository at this point in the history
Just clean the `setTimeout` we are doing for rpc in-case we already
received the message before the timeout.
  • Loading branch information
yosiat committed May 22, 2023
1 parent 84b7967 commit 3da6fa8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "arnavmq",
"version": "0.15.1",
"version": "0.15.2",
"description": "ArnavMQ is a RabbitMQ wrapper",
"keywords": [
"rabbitmq",
Expand Down
32 changes: 22 additions & 10 deletions src/modules/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ class ProducerError extends Error {

class Producer {
constructor(connection) {
this.amqpRPCQueues = {};
this._connection = connection;

/**
* Map of rpc queues
*
* [queue: string] -> [correlationId: string] -> {responsePromise, timeoutId}
*/
this.amqpRPCQueues = {};
}

set connection(value) {
Expand All @@ -45,10 +51,10 @@ class Producer {
return (msg) => {
// check the correlation ID sent by the initial message using RPC
const { correlationId } = msg.properties;
const responsePromise = rpcQueue[correlationId];
const waiter = rpcQueue[correlationId];

// On timeout the waiter is deleted, so we need to handle the race when the response arrives too late.
if (responsePromise === undefined) {
if (waiter === undefined) {
const error = new Error(`Receiving RPC message from previous session: callback no more in memory. ${queue}`);
this._connection.config.logger.warn({
message: `${loggerAlias} ${error.message}`,
Expand All @@ -66,10 +72,11 @@ class Producer {
});

try {
responsePromise.resolve(parsers.in(msg));
waiter.responsePromise.resolve(parsers.in(msg));
} catch (e) {
responsePromise.reject(new ProducerError(e));
waiter.responsePromise.reject(new ProducerError(e));
} finally {
clearTimeout(waiter.timeoutId);
delete rpcQueue[correlationId];
}
};
Expand Down Expand Up @@ -138,10 +145,15 @@ class Producer {
*/
prepareTimeoutRpc(queue, corrId, time) {
const producer = this;
setTimeout(() => {
const rpcCallback = producer.amqpRPCQueues[queue][corrId];
if (rpcCallback) {
rpcCallback.reject(new Error(ERRORS.TIMEOUT));
let waiter = producer.amqpRPCQueues[queue][corrId];
if (!waiter) {
return;
}

waiter.timeoutId = setTimeout(() => {
waiter = producer.amqpRPCQueues[queue][corrId];
if (waiter) {
waiter.responsePromise.reject(new Error(ERRORS.TIMEOUT));
delete producer.amqpRPCQueues[queue][corrId];
}
}, time);
Expand All @@ -168,7 +180,7 @@ class Producer {

// deferred promise that will resolve when response is received
const responsePromise = pDefer();
this.amqpRPCQueues[queue][corrId] = responsePromise;
this.amqpRPCQueues[queue][corrId] = { responsePromise, timeoutId: null };

await this.publishOrSendToQueue(queue, msg, options);

Expand Down

0 comments on commit 3da6fa8

Please sign in to comment.