Skip to content

Commit

Permalink
Use unique message key in Azure ServiceBus PubSub (dapr#1898)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Duss <andy.duss@storable.com>
  • Loading branch information
jjcollinge authored and Andrew Duss committed Aug 18, 2022
1 parent 35febf1 commit 6c61583
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions internal/component/azure/servicebus/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package servicebus

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -32,7 +33,7 @@ type HandlerFunc func(ctx context.Context, msg *azservicebus.ReceivedMessage) er
type Subscription struct {
entity string
mu sync.RWMutex
activeMessages map[string]*azservicebus.ReceivedMessage
activeMessages map[int64]*azservicebus.ReceivedMessage
activeMessagesChan chan struct{}
receiver *azservicebus.Receiver
timeout time.Duration
Expand All @@ -57,7 +58,7 @@ func NewSubscription(
ctx, cancel := context.WithCancel(parentCtx)
s := &Subscription{
entity: entity,
activeMessages: make(map[string]*azservicebus.ReceivedMessage),
activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
activeMessagesChan: make(chan struct{}, maxActiveMessages),
timeout: time.Duration(timeoutInSec) * time.Second,
logger: logger,
Expand Down Expand Up @@ -175,7 +176,13 @@ func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int
s.logger.Debugf("Received message: %s; current active message usage: %d/%d", msg.MessageID, len(s.activeMessagesChan), cap(s.activeMessagesChan))
// s.logger.Debugf("Message body: %s", string(msg.Body))

s.addActiveMessage(msg)
if err = s.addActiveMessage(msg); err != nil {
// If we cannot add the message then sequence number is not set, this must
// be a bug in the Azure Service Bus SDK so we will log the error and not
// handle the message. The message will eventually be retried until fixed.
s.logger.Errorf("Error adding message: %s", err.Error())
continue
}

s.logger.Debugf("Processing received message: %s", msg.MessageID)
s.handleAsync(s.ctx, msg, handler)
Expand Down Expand Up @@ -217,7 +224,7 @@ func (s *Subscription) handleAsync(ctx context.Context, msg *azservicebus.Receiv
}

// Remove the message from the map of active ones
s.removeActiveMessage(msg.MessageID)
s.removeActiveMessage(msg.MessageID, *msg.SequenceNumber)

// Remove an entry from activeMessageChan to allow processing more messages
<-s.activeMessagesChan
Expand Down Expand Up @@ -315,16 +322,20 @@ func (s *Subscription) CompleteMessage(ctx context.Context, m *azservicebus.Rece
}
}

func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) {
s.logger.Debugf("Adding message %s to active messages on %s", m.MessageID, s.entity)
func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) error {
if m.SequenceNumber == nil {
return fmt.Errorf("message sequence number is nil")
}
s.logger.Debugf("Adding message %s with sequence number %d to active messages on %s", m.MessageID, *m.SequenceNumber, s.entity)
s.mu.Lock()
s.activeMessages[m.MessageID] = m
s.activeMessages[*m.SequenceNumber] = m
s.mu.Unlock()
return nil
}

func (s *Subscription) removeActiveMessage(messageID string) {
s.logger.Debugf("Removing message %s from active messages on %s", messageID, s.entity)
func (s *Subscription) removeActiveMessage(messageID string, messageKey int64) {
s.logger.Debugf("Removing message %s with sequence number %d from active messages on %s", messageID, messageKey, s.entity)
s.mu.Lock()
delete(s.activeMessages, messageID)
delete(s.activeMessages, messageKey)
s.mu.Unlock()
}

0 comments on commit 6c61583

Please sign in to comment.