From 6c61583ea39426dc85570c0738a809ef70852d72 Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Thu, 21 Jul 2022 18:13:58 +0100 Subject: [PATCH] Use unique message key in Azure ServiceBus PubSub (#1898) Signed-off-by: Andrew Duss --- .../azure/servicebus/subscription.go | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/internal/component/azure/servicebus/subscription.go b/internal/component/azure/servicebus/subscription.go index 73a2ad1826..d64ebcec53 100644 --- a/internal/component/azure/servicebus/subscription.go +++ b/internal/component/azure/servicebus/subscription.go @@ -15,6 +15,7 @@ package servicebus import ( "context" + "fmt" "sync" "time" @@ -32,7 +33,7 @@ type HandlerFunc func(ctx context.Context, msg *azservicebus.ReceivedMessage) er type Subscription struct { entity string mu sync.RWMutex - activeMessages map[string]*azservicebus.ReceivedMessage + activeMessages map[int64]*azservicebus.ReceivedMessage activeMessagesChan chan struct{} receiver *azservicebus.Receiver timeout time.Duration @@ -57,7 +58,7 @@ func NewSubscription( ctx, cancel := context.WithCancel(parentCtx) s := &Subscription{ entity: entity, - activeMessages: make(map[string]*azservicebus.ReceivedMessage), + activeMessages: make(map[int64]*azservicebus.ReceivedMessage), activeMessagesChan: make(chan struct{}, maxActiveMessages), timeout: time.Duration(timeoutInSec) * time.Second, logger: logger, @@ -175,7 +176,13 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int s.logger.Debugf("Received message: %s; current active message usage: %d/%d", msg.MessageID, len(s.activeMessagesChan), cap(s.activeMessagesChan)) // s.logger.Debugf("Message body: %s", string(msg.Body)) - s.addActiveMessage(msg) + if err = s.addActiveMessage(msg); err != nil { + // If we cannot add the message then sequence number is not set, this must + // be a bug in the Azure Service Bus SDK so we will log the error and not + // handle the message. The message will eventually be retried until fixed. + s.logger.Errorf("Error adding message: %s", err.Error()) + continue + } s.logger.Debugf("Processing received message: %s", msg.MessageID) s.handleAsync(s.ctx, msg, handler) @@ -217,7 +224,7 @@ func (s *Subscription) handleAsync(ctx context.Context, msg *azservicebus.Receiv } // Remove the message from the map of active ones - s.removeActiveMessage(msg.MessageID) + s.removeActiveMessage(msg.MessageID, *msg.SequenceNumber) // Remove an entry from activeMessageChan to allow processing more messages <-s.activeMessagesChan @@ -315,16 +322,20 @@ func (s *Subscription) CompleteMessage(ctx context.Context, m *azservicebus.Rece } } -func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) { - s.logger.Debugf("Adding message %s to active messages on %s", m.MessageID, s.entity) +func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) error { + if m.SequenceNumber == nil { + return fmt.Errorf("message sequence number is nil") + } + s.logger.Debugf("Adding message %s with sequence number %d to active messages on %s", m.MessageID, *m.SequenceNumber, s.entity) s.mu.Lock() - s.activeMessages[m.MessageID] = m + s.activeMessages[*m.SequenceNumber] = m s.mu.Unlock() + return nil } -func (s *Subscription) removeActiveMessage(messageID string) { - s.logger.Debugf("Removing message %s from active messages on %s", messageID, s.entity) +func (s *Subscription) removeActiveMessage(messageID string, messageKey int64) { + s.logger.Debugf("Removing message %s with sequence number %d from active messages on %s", messageID, messageKey, s.entity) s.mu.Lock() - delete(s.activeMessages, messageID) + delete(s.activeMessages, messageKey) s.mu.Unlock() }