Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azeventhubs] Fixing checkpoint store race condition #20727

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Potential leaks for $cbs and $management when there was a partial failure. (PR#20564)
- Latest go-amqp changes have been merged in with fixes for robustness.
- Sending a message to an entity that is full will no longer retry. (PR#20722)
- Checkpoint store handles multiple initial owners properly, allowing only one through. (PR#20727)

## 0.6.0 (2023-03-07)

Expand Down
80 changes: 54 additions & 26 deletions sdk/messaging/azeventhubs/checkpoints/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func NewBlobStore(containerClient *container.Client, options *BlobStoreOptions)

// ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns
// the actual partitions that were claimed.
//
// If we fail to claim ownership because of another update then it will be omitted from the
// returned slice of [Ownership]'s. It is not considered an error.
func (b *BlobStore) ClaimOwnership(ctx context.Context, partitionOwnership []azeventhubs.Ownership, options *azeventhubs.ClaimOwnershipOptions) ([]azeventhubs.Ownership, error) {
var ownerships []azeventhubs.Ownership

Expand All @@ -54,13 +57,12 @@ func (b *BlobStore) ClaimOwnership(ctx context.Context, partitionOwnership []aze
if err != nil {
return nil, err
}
lastModified, etag, err := b.setMetadata(ctx, blobName, newOwnershipBlobMetadata(po), po.ETag)
lastModified, etag, err := b.setOwnershipMetadata(ctx, blobName, po)

if err != nil {
if bloberror.HasCode(err, bloberror.ConditionNotMet) {
// we can fail to claim ownership and that's okay - it's expected that clients will
// attempt to claim with whatever state they hold locally. If they fail it just means
// someone else claimed ownership before them.
if bloberror.HasCode(err,
bloberror.ConditionNotMet, // updated before we could update it
bloberror.BlobAlreadyExists) { // created before we could create it
continue
}

Expand Down Expand Up @@ -179,25 +181,28 @@ func (b *BlobStore) ListOwnership(ctx context.Context, fullyQualifiedNamespace s
}

// UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
//
// NOTE: This function doesn't attempt to prevent simultaneous checkpoint updates - ownership is assumed.
func (b *BlobStore) UpdateCheckpoint(ctx context.Context, checkpoint azeventhubs.Checkpoint, options *azeventhubs.UpdateCheckpointOptions) error {
blobName, err := nameForCheckpointBlob(checkpoint)

if err != nil {
return err
}

_, _, err = b.setMetadata(ctx, blobName, newCheckpointBlobMetadata(checkpoint), nil)
_, _, err = b.setCheckpointMetadata(ctx, blobName, checkpoint)
return err
}

func (b *BlobStore) setMetadata(ctx context.Context, blobName string, blobMetadata map[string]*string, etag *azcore.ETag) (*time.Time, azcore.ETag, error) {
func (b *BlobStore) setOwnershipMetadata(ctx context.Context, blobName string, ownership azeventhubs.Ownership) (*time.Time, azcore.ETag, error) {
blobMetadata := newOwnershipBlobMetadata(ownership)
blobClient := b.cc.NewBlockBlobClient(blobName)

if etag != nil {
if ownership.ETag != nil {
setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, &blob.SetMetadataOptions{
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
IfMatch: etag,
IfMatch: ownership.ETag,
},
},
})
Expand All @@ -207,29 +212,52 @@ func (b *BlobStore) setMetadata(ctx context.Context, blobName string, blobMetada
}

return setMetadataResp.LastModified, *setMetadataResp.ETag, nil
} else {
setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, nil)
}

if err == nil {
return setMetadataResp.LastModified, *setMetadataResp.ETag, nil
}
uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{
Metadata: blobMetadata,
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
IfNoneMatch: to.Ptr(azcore.ETag("*")),
},
},
})

if !bloberror.HasCode(err, bloberror.BlobNotFound) {
return nil, "", err
}
if err != nil {
return nil, "", err
}

// in JS they check to see if the error is BlobNotFound. If it is, then they
// do a full upload of a blob instead.
uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{
Metadata: blobMetadata,
})
return uploadResp.LastModified, *uploadResp.ETag, nil
}

if err != nil {
return nil, "", err
}
// setCheckpointMetadata sets the metadata for a checkpoint, falling back to creating
// the blob if it doesn't already exist.
//
// NOTE: unlike [setOwnershipMetadata] this function doesn't attempt to prevent simultaneous
// checkpoint updates - ownership is assumed.
func (b *BlobStore) setCheckpointMetadata(ctx context.Context, blobName string, checkpoint azeventhubs.Checkpoint) (*time.Time, azcore.ETag, error) {
blobMetadata := newCheckpointBlobMetadata(checkpoint)
blobClient := b.cc.NewBlockBlobClient(blobName)

setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, nil)

if err == nil {
return setMetadataResp.LastModified, *setMetadataResp.ETag, nil
}

return uploadResp.LastModified, *uploadResp.ETag, nil
if !bloberror.HasCode(err, bloberror.BlobNotFound) {
return nil, "", err
}

uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{
Metadata: blobMetadata,
})

if err != nil {
return nil, "", err
}

return uploadResp.LastModified, *uploadResp.ETag, nil
}

func nameForCheckpointBlob(a azeventhubs.Checkpoint) (string, error) {
Expand Down
121 changes: 121 additions & 0 deletions sdk/messaging/azeventhubs/checkpoints/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package checkpoints_test

import (
"context"
"fmt"
"os"
"strconv"
"testing"
Expand Down Expand Up @@ -216,6 +217,126 @@ func TestBlobStore_ListAndClaim(t *testing.T) {
require.Empty(t, claimedOwnerships)
}

func TestBlobStore_OnlyOneOwnershipClaimSucceeds(t *testing.T) {
testData := getContainerClient(t)
defer testData.Cleanup()

cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil)
require.NoError(t, err)

store, err := checkpoints.NewBlobStore(cc, nil)
require.NoError(t, err)

// we're going to make multiple calls to the blob store but only _one_ should succeed
// since it's "first one in wins"
claimsCh := make(chan []azeventhubs.Ownership, 20)

t.Logf("Starting %d goroutines to claim ownership without an etag", cap(claimsCh))

// attempt to claim the same partition from multiple goroutines. Only _one_ of the
// goroutines should walk away thinking it claimed the partition.
for i := 0; i < cap(claimsCh); i++ {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{
{ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"},
}, nil)

if err != nil {
claimsCh <- nil
require.NoError(t, err)
} else {
claimsCh <- ownerships
}
}()
}

claimed := map[string]bool{}
numFailedClaims := 0

for i := 0; i < cap(claimsCh); i++ {
claims := <-claimsCh

if claims == nil {
numFailedClaims++
continue
}

for _, claim := range claims {
require.False(t, claimed[claim.PartitionID], fmt.Sprintf("Partition ID %s was claimed more than once", claim.PartitionID))
require.NotNil(t, claim.ETag)
claimed[claim.PartitionID] = true
}
}

require.Equal(t, cap(claimsCh)-1, numFailedClaims, fmt.Sprintf("One of the 1/%d wins and the rest all fail to claim", cap(claimsCh)))
}

func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) {
testData := getContainerClient(t)
defer testData.Cleanup()

cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil)
require.NoError(t, err)

store, err := checkpoints.NewBlobStore(cc, nil)
require.NoError(t, err)

// we're going to make multiple calls to the blob store but only _one_ should succeed
// since it's "first one in wins"
claimsCh := make(chan []azeventhubs.Ownership, 20)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{
{ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"},
}, nil)
require.NoError(t, err)
require.Equal(t, "0", ownerships[0].PartitionID)
require.NotNil(t, ownerships[0].ETag)

t.Logf("Starting %d goroutines to claim ownership without an etag", cap(claimsCh))

// attempt to claim the same partition from multiple goroutines. Only _one_ of the
// goroutines should walk away thinking it claimed the partition.
for i := 0; i < cap(claimsCh); i++ {
go func() {

ownerships, err := store.ClaimOwnership(ctx, ownerships, nil)

if err != nil {
claimsCh <- nil
require.NoError(t, err)
} else {
claimsCh <- ownerships
}
}()
}

claimed := map[string]bool{}
numFailedClaims := 0

for i := 0; i < cap(claimsCh); i++ {
claims := <-claimsCh

if claims == nil {
numFailedClaims++
continue
}

for _, claim := range claims {
require.False(t, claimed[claim.PartitionID], fmt.Sprintf("Partition ID %s was claimed more than once", claim.PartitionID))
require.NotNil(t, claim.ETag)
claimed[claim.PartitionID] = true
}
}

require.Equal(t, cap(claimsCh)-1, numFailedClaims, fmt.Sprintf("One of the 1/%d wins and the rest all fail to claim", cap(claimsCh)))
}

func getContainerClient(t *testing.T) struct {
ConnectionString string
ContainerName string
Expand Down
7 changes: 7 additions & 0 deletions sdk/messaging/azeventhubs/internal/test/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func CaptureLogsForTest() func() []string {

func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string {
setAzLogListener(func(e azlog.Event, s string) {
defer func() {
if err := recover(); err != nil {
fmt.Printf("FAILED SENDING MESSAGE (%s), message was: [%s] %s\n", err, e, s)
panic(err)
}
}()

messagesCh <- fmt.Sprintf("[%s] %s", e, s)
})

Expand Down