Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Some cleanup in prep for release #20932

Merged
merged 2 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions sdk/messaging/azservicebus/admin/admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ func testTopicCreation(t *testing.T, isPremium bool) {
func TestAdminClient_TopicAndSubscription_WithFalseFilterDefaultSubscriptionRule(t *testing.T) {
adminClient, topicName := createAdminClientWithTestTopic(t)

defer deleteTopic(t, adminClient, topicName)

subscriptionName := createTestSubscriptionWithDefaultRule(
t,
adminClient,
Expand All @@ -577,13 +579,13 @@ func TestAdminClient_TopicAndSubscription_WithFalseFilterDefaultSubscriptionRule
require.Equal(t, "$Default", defaultRule.Name)
require.IsType(t, &FalseFilter{}, defaultRule.Filter)
require.Nil(t, defaultRule.Action)

defer deleteSubscription(t, adminClient, topicName, subscriptionName)
}

func TestAdminClient_TopicAndSubscription_WithCustomFilterDefaultSubscriptionRule(t *testing.T) {
adminClient, topicName := createAdminClientWithTestTopic(t)

defer deleteTopic(t, adminClient, topicName)

customSqlFilter := &SQLFilter{
Expression: "SomeProperty LIKE 'O%'",
}
Expand All @@ -604,13 +606,13 @@ func TestAdminClient_TopicAndSubscription_WithCustomFilterDefaultSubscriptionRul
require.Equal(t, "TestRule", defaultRule.Name)
require.Equal(t, customSqlFilter, defaultRule.Filter)
require.Nil(t, defaultRule.Action)

defer deleteSubscription(t, adminClient, topicName, subscriptionName)
}

func TestAdminClient_TopicAndSubscription_WithActionDefaultSubscriptionRule(t *testing.T) {
adminClient, topicName := createAdminClientWithTestTopic(t)

defer deleteTopic(t, adminClient, topicName)

ruleAction := &SQLAction{
Expression: "SET MessageID=@stringVar",
Parameters: map[string]any{
Expand All @@ -633,13 +635,13 @@ func TestAdminClient_TopicAndSubscription_WithActionDefaultSubscriptionRule(t *t
require.Equal(t, "$Default", defaultRule.Name)
require.Equal(t, ruleAction, defaultRule.Action)
require.Equal(t, defaultRule.Filter, &TrueFilter{})

defer deleteSubscription(t, adminClient, topicName, subscriptionName)
}

func TestAdminClient_TopicAndSubscription_WithActionAndFilterDefaultSubscriptionRule(t *testing.T) {
adminClient, topicName := createAdminClientWithTestTopic(t)

defer deleteTopic(t, adminClient, topicName)

ruleAction := &SQLAction{
Expression: "SET MessageID=@stringVar",
Parameters: map[string]any{
Expand Down Expand Up @@ -667,8 +669,6 @@ func TestAdminClient_TopicAndSubscription_WithActionAndFilterDefaultSubscription
require.Equal(t, "$Default", defaultRule.Name)
require.EqualValues(t, ruleAction, defaultRule.Action)
require.EqualValues(t, ruleFilter, defaultRule.Filter)

defer deleteSubscription(t, adminClient, topicName, subscriptionName)
}

func createAdminClientWithTestTopic(t *testing.T) (*Client, string) {
Expand Down
26 changes: 26 additions & 0 deletions sdk/messaging/azservicebus/liveTestHelpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,29 @@ func peekSingleMessageForTest(t *testing.T, receiver *Receiver) *ReceivedMessage

return msg
}

func requireScheduledMessageDisappears(ctx context.Context, t *testing.T, receiver *Receiver, sequenceNumber int64) {
// this function will keep checking a particular sequence number until it's gone (ie, it was the last
// sequence number so it's obvious) _or_ we end up retrieving the next message instead since
// it auto-skips gaps.

for {
msgs, err := receiver.PeekMessages(ctx, 1, &PeekMessagesOptions{
FromSequenceNumber: &sequenceNumber,
})
require.NoError(t, err)

if len(msgs) == 0 {
// no message exists at the sequence number, and there was nowhere to jump to
return
}

if *msgs[0].SequenceNumber != sequenceNumber {
// the message is gone, we've been pushed to the next message after the "gap"
return
}

require.Equal(t, MessageStateScheduled, msgs[0].State)
time.Sleep(100 * time.Millisecond)
}
}
173 changes: 73 additions & 100 deletions sdk/messaging/azservicebus/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,74 +251,14 @@ func messageFromReceivedMessage(t *testing.T, receivedMessage *ReceivedMessage)
}

func Test_Sender_ScheduleAMQPMessages(t *testing.T) {
ctx := context.Background()

client, cleanup, queueName := setupLiveTest(t, nil)
defer cleanup()

receiver, err := client.NewReceiverForQueue(
queueName, &ReceiverOptions{ReceiveMode: ReceiveModeReceiveAndDelete})
require.NoError(t, err)
defer receiver.Close(context.Background())

sender, err := client.NewSender(queueName, nil)
require.NoError(t, err)
defer sender.Close(context.Background())

now := time.Now()
nearFuture := now.Add(20 * time.Second)

// there are two ways to schedule a message - you can use the
// `ScheduleMessages` API (in which case you get a sequence number that
// you can use with CancelScheduledMessage(s)) or you can set the
// `Scheduled`
sequenceNumbers, err := sender.ScheduleAMQPAnnotatedMessages(ctx,
[]*AMQPAnnotatedMessage{
{Body: AMQPAnnotatedMessageBody{Data: [][]byte{[]byte("To the future (that will be cancelled!)")}}},
{Body: AMQPAnnotatedMessageBody{Data: [][]byte{[]byte("To the future (not cancelled)")}}},
},
nearFuture, nil)

require.NoError(t, err)
require.EqualValues(t, 2, len(sequenceNumbers))

peekedMsg := peekSingleMessageForTest(t, receiver)
require.EqualValues(t, MessageStateScheduled, peekedMsg.State)

// cancel one of the ones scheduled using `ScheduleMessages`
err = sender.CancelScheduledMessages(ctx, []int64{sequenceNumbers[0]}, nil)
require.NoError(t, err)

// this isn't a typical way of doing this, but it's possible to set the field directly
// rather than using the simpler ScheduleAMQPMessages
err = sender.SendAMQPAnnotatedMessage(ctx,
&AMQPAnnotatedMessage{
Body: AMQPAnnotatedMessageBody{
Data: [][]byte{[]byte("To the future (scheduled using the field)")},
},
MessageAnnotations: map[any]any{
"x-opt-scheduled-enqueue-time": &nearFuture,
},
}, nil)

require.NoError(t, err)

messages, err := receiver.ReceiveMessages(ctx, 2, nil)
require.NoError(t, err)

// we cancelled one of the messages so it won't get enqueued (this is the one that survived)
require.EqualValues(t, []string{"To the future (not cancelled)", "To the future (scheduled using the field)"}, getSortedBodies(messages))

for _, m := range messages {
// and the scheduled enqueue time should match what we set pretty closely.
diff := m.ScheduledEnqueueTime.Sub(nearFuture.UTC())

// add a little wiggle room, but the scheduled time and the time we set when we scheduled it.
require.LessOrEqual(t, diff, time.Second, "The requested scheduled time and the actual scheduled time should be close [%s]", m.ScheduledEnqueueTime)
}
testScheduleMessages(t, true)
}

func Test_Sender_ScheduleMessages(t *testing.T) {
testScheduleMessages(t, false)
}

func testScheduleMessages(t *testing.T, rawAMQP bool) {
ctx := context.Background()

client, cleanup, queueName := setupLiveTest(t, nil)
Expand All @@ -334,50 +274,83 @@ func Test_Sender_ScheduleMessages(t *testing.T) {
defer sender.Close(context.Background())

now := time.Now()
nearFuture := now.Add(20 * time.Second)

// there are two ways to schedule a message - you can use the
// `ScheduleMessages` API (in which case you get a sequence number that
// you can use with CancelScheduledMessage(s)) or you can set the
// `Scheduled`
sequenceNumbers, err := sender.ScheduleMessages(ctx,
[]*Message{
{Body: []byte("To the future (that will be cancelled!)")},
{Body: []byte("To the future (not cancelled)")},
},
nearFuture, nil)
farFuture := now.Add(time.Hour)

var sequenceNumbers []int64

if rawAMQP {
// there are two ways to schedule a message - you can use the
// `ScheduleMessages` API (in which case you get a sequence number that
// you can use with CancelScheduledMessage(s)) or you can set the
// `ScheduledEnqueueTime` field.
tmp, err := sender.ScheduleAMQPAnnotatedMessages(ctx,
[]*AMQPAnnotatedMessage{
{Body: AMQPAnnotatedMessageBody{Data: [][]byte{[]byte("To the future (that will be cancelled!)")}}},
{Body: AMQPAnnotatedMessageBody{Data: [][]byte{[]byte("To the future (not cancelled)")}}},
},
farFuture, nil)
require.NoError(t, err)
sequenceNumbers = tmp
} else {
// there are two ways to schedule a message - you can use the
// `ScheduleMessages` API (in which case you get a sequence number that
// you can use with CancelScheduledMessage(s)) or you can set the
// `ScheduledEnqueueTime` field.
tmp, err := sender.ScheduleMessages(ctx,
[]*Message{
{Body: []byte("To the future (that will be cancelled!)")},
{Body: []byte("To the future (not cancelled)")},
},
farFuture, nil)
require.NoError(t, err)
sequenceNumbers = tmp
}

require.NoError(t, err)
require.EqualValues(t, 2, len(sequenceNumbers))

peekedMsg := peekSingleMessageForTest(t, receiver)
require.EqualValues(t, MessageStateScheduled, peekedMsg.State)
if rawAMQP {
err := sender.SendAMQPAnnotatedMessage(ctx,
&AMQPAnnotatedMessage{
Body: AMQPAnnotatedMessageBody{
Data: [][]byte{[]byte("To the future (scheduled using the field)")},
},
MessageAnnotations: map[any]any{
"x-opt-scheduled-enqueue-time": &farFuture,
},
}, nil)
require.NoError(t, err)
} else {
err := sender.SendAMQPAnnotatedMessage(ctx,
&AMQPAnnotatedMessage{
Body: AMQPAnnotatedMessageBody{
Data: [][]byte{[]byte("To the future (scheduled using the field)")},
},
MessageAnnotations: map[any]any{
"x-opt-scheduled-enqueue-time": &farFuture,
},
}, nil)
require.NoError(t, err)
}

// cancel one of the ones scheduled using `ScheduleMessages`
err = sender.CancelScheduledMessages(ctx, []int64{sequenceNumbers[0]}, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
var peekedMsgs []*ReceivedMessage

err = sender.SendMessage(ctx,
&Message{
Body: []byte("To the future (scheduled using the field)"),
ScheduledEnqueueTime: &nearFuture,
}, nil)
for len(peekedMsgs) < 3 {
msgs, err := receiver.PeekMessages(ctx, 1, nil)
require.NoError(t, err)

require.NoError(t, err)
if len(msgs) > 0 {
require.Equal(t, MessageStateScheduled, msgs[0].State)
peekedMsgs = append(peekedMsgs, msgs...)
}
}

messages, err := receiver.ReceiveMessages(ctx, 2, nil)
err = sender.CancelScheduledMessages(ctx, []int64{sequenceNumbers[0]}, nil)
require.NoError(t, err)

// we cancelled one of the messages so it won't get enqueued (this is the one that survived)
require.EqualValues(t, []string{"To the future (not cancelled)", "To the future (scheduled using the field)"}, getSortedBodies(messages))

for _, m := range messages {
// and the scheduled enqueue time should match what we set pretty closely.
diff := m.ScheduledEnqueueTime.Sub(nearFuture.UTC())

// add a little wiggle room, but the scheduled time and the time we set when we scheduled it.
require.LessOrEqual(t, diff, time.Second, "The requested scheduled time and the actual scheduled time should be close [%s]", m.ScheduledEnqueueTime)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Minute)
defer cancel()
requireScheduledMessageDisappears(ctx, t, receiver, sequenceNumbers[0])
}

func TestSender_SendMessagesDetach(t *testing.T) {
Expand Down