Skip to content

Commit

Permalink
[Event Hubs Client] Storage Implementation Feedback (Azure#18439)
Browse files Browse the repository at this point in the history
The focus of these changes is to consider feedback provided by an internal
review of the blob storage manager by one of the developers responsible for
the Azure Blob Storage client library.  Many of the larger and more critical
items have already been used for adjustments; these tweaks address the majority
of the remaining minor items.
  • Loading branch information
jsquire authored and jongio committed Feb 9, 2021
1 parent d9f774e commit 0a98c77
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ internal partial class BlobsCheckpointStore : StorageManager
private static readonly string BlobsResourceDoesNotExist = "The Azure Storage Blobs container or blob used by the Event Processor Client does not exist.";
#pragma warning restore CA1810

/// <summary>A regular expression used to capture strings enclosed in double quotes.</summary>
private static readonly Regex DoubleQuotesExpression = new Regex("\"(.*)\"", RegexOptions.Compiled);

/// <summary>An ETag value to be used for permissive matching when querying Storage.</summary>
private static readonly ETag IfNoneMatchAllTag = new ETag("*");

Expand Down Expand Up @@ -126,7 +123,7 @@ public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ListOw
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
ListOwnershipStart(fullyQualifiedNamespace, eventHubName, consumerGroup);

List<EventProcessorPartitionOwnership> result = new List<EventProcessorPartitionOwnership>();
var result = new List<EventProcessorPartitionOwnership>();

try
{
Expand Down Expand Up @@ -156,7 +153,7 @@ public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ListOw
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
{
ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
throw new RequestFailedException(BlobsResourceDoesNotExist, ex);
}
finally
{
Expand Down Expand Up @@ -231,14 +228,9 @@ public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimO
}

// Small workaround to retrieve the eTag. The current storage SDK returns it enclosed in
// double quotes ('"ETAG_VALUE"' instead of 'ETAG_VALUE').
// double quotes ("ETAG_VALUE" instead of ETAG_VALUE).

var match = DoubleQuotesExpression.Match(ownership.Version);

if (match.Success)
{
ownership.Version = match.Groups[1].ToString();
}
ownership.Version = ownership.Version?.Trim('"');

claimedOwnership.Add(ownership);
OwnershipClaimed(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier);
Expand All @@ -250,7 +242,7 @@ public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimO
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound || ex.ErrorCode == BlobErrorCode.BlobNotFound)
{
ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
throw new RequestFailedException(BlobsResourceDoesNotExist, ex);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -303,20 +295,23 @@ public override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpoint

if (InitializeWithLegacyCheckpoints)
{
// Legacy checkpoints are not normalized to lowercase
// Legacy checkpoints are not normalized to lowercase.

var legacyPrefix = string.Format(CultureInfo.InvariantCulture, FunctionsLegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup);

await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(prefix: legacyPrefix, cancellationToken: cancellationToken).ConfigureAwait(false))
{
// Skip new checkpoints and empty blobs
// Skip new checkpoints and empty blobs.

if (blob.Properties.ContentLength == 0)
{
continue;
}

var partitionId = blob.Name.Substring(legacyPrefix.Length);

// Check whether there is already a checkpoint for this partition id
// Check whether there is already a checkpoint for this partition id.

if (checkpoints.Any(existingCheckpoint => string.Equals(existingCheckpoint.PartitionId, partitionId, StringComparison.Ordinal)))
{
continue;
Expand All @@ -335,7 +330,7 @@ public override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpoint
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
{
ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
throw new RequestFailedException(BlobsResourceDoesNotExist, ex);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -502,7 +497,8 @@ private async Task<EventProcessorCheckpoint> CreateLegacyCheckpoint(string fully
}
else
{
// Skip checkpoints without an offset without logging an error
// Skip checkpoints without an offset without logging an error.

return null;
}
}
Expand Down Expand Up @@ -563,14 +559,15 @@ public override async Task UpdateCheckpointAsync(EventProcessorCheckpoint checkp
catch (RequestFailedException ex) when ((ex.ErrorCode == BlobErrorCode.BlobNotFound) || (ex.ErrorCode == BlobErrorCode.ContainerNotFound))
{
// If the blob wasn't present, fall-back to trying to create a new one.

using var blobContent = new MemoryStream(Array.Empty<byte>());
await blobClient.UploadAsync(blobContent, metadata: metadata, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
{
UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
throw new RequestFailedException(BlobsResourceDoesNotExist, ex);
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,16 @@ public virtual async ValueTask<EventProcessorPartitionOwnership> RunLoadBalancin
.ConfigureAwait(false))
.ToList();
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw new TaskCanceledException();
}
catch (Exception ex)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// If ownership list retrieval fails, give up on the current cycle. There's nothing more we can do
// without an updated ownership list. Set the EventHubName to null so it doesn't modify the exception
// message. This exception message is used so the processor can retrieve the raw Operation string, and
Expand All @@ -198,14 +204,12 @@ public virtual async ValueTask<EventProcessorPartitionOwnership> RunLoadBalancin
return default;
}

var unclaimedPartitions = new HashSet<string>(partitionIds);

// Create a partition distribution dictionary from the complete ownership list we have, mapping an owner's identifier to the list of
// partitions it owns. When an event processor goes down and it has only expired ownership, it will not be taken into consideration
// by others. The expiration time defaults to 30 seconds, but it may be overridden by a derived class.

var unclaimedPartitions = new HashSet<string>(partitionIds);
var utcNow = GetDateTimeOffsetNow();

var activeOwnership = default(EventProcessorPartitionOwnership);

ActiveOwnershipWithDistribution.Clear();
Expand Down Expand Up @@ -290,7 +294,6 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
});

await StorageManager.ClaimOwnershipAsync(ownershipToRelinquish, cancellationToken).ConfigureAwait(false);

InstanceOwnership.Clear();
}

Expand Down Expand Up @@ -422,7 +425,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio

// No ownership has been claimed.

return new ValueTask<(bool, EventProcessorPartitionOwnership)>((false, default(EventProcessorPartitionOwnership)));
return new ValueTask<(bool, EventProcessorPartitionOwnership)>((false, default));
}

/// <summary>
Expand Down

0 comments on commit 0a98c77

Please sign in to comment.