Skip to content

Commit

Permalink
check message value exists before processing messages
Browse files Browse the repository at this point in the history
  • Loading branch information
epipav committed Sep 27, 2024
1 parent 803bd40 commit 1cd3bc8
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions services/libs/queue/src/vendors/kafka/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ export class KafkaQueueService extends LoggerBase implements IQueue {

this.log.trace({ topic: queueConf.name }, 'Subscribed to topic! Starting the consmer...')
await consumer.run({
eachMessage: async ({ message }: EachMessagePayload) => {
if (this.isAvailable(maxConcurrentMessageProcessing)) {
eachMessage: async ({ message, topic }: EachMessagePayload) => {
if (message && message.value && this.isAvailable(maxConcurrentMessageProcessing)) {
const now = performance.now()

this.log.trace(
Expand All @@ -243,6 +243,11 @@ export class KafkaQueueService extends LoggerBase implements IQueue {
} finally {
this.removeJob()
}
} else if (
this.isAvailable(maxConcurrentMessageProcessing) &&
(!message || !message.value)
) {
this.log.debug({ message, topic }, 'Received empty message, skipping...')
} else {
this.log.debug('Processor is busy, skipping message...')
}
Expand Down

0 comments on commit 1cd3bc8

Please sign in to comment.