From 1cd3bc899a6ead3c8a35e80d3bb6e2a73795cb94 Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 27 Sep 2024 15:43:55 +0200 Subject: [PATCH] check message value exists before processing messages --- services/libs/queue/src/vendors/kafka/client.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/services/libs/queue/src/vendors/kafka/client.ts b/services/libs/queue/src/vendors/kafka/client.ts index 2bf90174d..f822b32ec 100644 --- a/services/libs/queue/src/vendors/kafka/client.ts +++ b/services/libs/queue/src/vendors/kafka/client.ts @@ -221,8 +221,8 @@ export class KafkaQueueService extends LoggerBase implements IQueue { this.log.trace({ topic: queueConf.name }, 'Subscribed to topic! Starting the consmer...') await consumer.run({ - eachMessage: async ({ message }: EachMessagePayload) => { - if (this.isAvailable(maxConcurrentMessageProcessing)) { + eachMessage: async ({ message, topic }: EachMessagePayload) => { + if (message && message.value && this.isAvailable(maxConcurrentMessageProcessing)) { const now = performance.now() this.log.trace( @@ -243,6 +243,11 @@ export class KafkaQueueService extends LoggerBase implements IQueue { } finally { this.removeJob() } + } else if ( + this.isAvailable(maxConcurrentMessageProcessing) && + (!message || !message.value) + ) { + this.log.debug({ message, topic }, 'Received empty message, skipping...') } else { this.log.debug('Processor is busy, skipping message...') }