Skip to content

Commit

Permalink
Bulk: Fixes incorrect routing on split (#2069)
Browse files Browse the repository at this point in the history
* increasing retry

* Correctly routing after split

* Adding tests

* More tests

* using cosmosexception

Co-authored-by: j82w <j82w@users.noreply.github.com>
  • Loading branch information
ealsur and j82w committed Dec 16, 2020
1 parent 470c3dd commit 535d197
Show file tree
Hide file tree
Showing 7 changed files with 403 additions and 52 deletions.
18 changes: 8 additions & 10 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ namespace Microsoft.Azure.Cosmos
/// <seealso cref="BatchAsyncStreamer"/>
internal class BatchAsyncContainerExecutor : IDisposable
{
private const int DefaultDispatchTimerInSeconds = 1;
private const int TimerWheelBucketCount = 20;
private static readonly TimeSpan TimerWheelResolution = TimeSpan.FromMilliseconds(50);

Expand All @@ -52,11 +51,6 @@ public BatchAsyncContainerExecutor(
int maxServerRequestOperationCount,
int maxServerRequestBodyLength)
{
if (cosmosContainer == null)
{
throw new ArgumentNullException(nameof(cosmosContainer));
}

if (maxServerRequestOperationCount < 1)
{
throw new ArgumentOutOfRangeException(nameof(maxServerRequestOperationCount));
Expand All @@ -67,7 +61,7 @@ public BatchAsyncContainerExecutor(
throw new ArgumentOutOfRangeException(nameof(maxServerRequestBodyLength));
}

this.cosmosContainer = cosmosContainer;
this.cosmosContainer = cosmosContainer ?? throw new ArgumentNullException(nameof(cosmosContainer));
this.cosmosClientContext = cosmosClientContext;
this.maxServerRequestBodyLength = maxServerRequestBodyLength;
this.maxServerRequestOperationCount = maxServerRequestOperationCount;
Expand All @@ -89,7 +83,7 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions));
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.OperationTask;
Expand Down Expand Up @@ -136,10 +130,13 @@ internal virtual async Task ValidateOperationAsync(
await operation.MaterializeResourceAsync(this.cosmosClientContext.SerializerCore, cancellationToken);
}

private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
private static IDocumentClientRetryPolicy GetRetryPolicy(
ContainerInternal containerInternal,
RetryOptions retryOptions)
{
return new BulkPartitionKeyRangeGoneRetryPolicy(
new ResourceThrottleRetryPolicy(
containerInternal,
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
retryOptions.MaxRetryWaitTimeInSeconds));
}
Expand Down Expand Up @@ -185,6 +182,7 @@ private async Task ReBatchAsync(
CancellationToken cancellationToken)
{
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
streamer.Add(operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal class ItemBatchOperationContext : IDisposable
{
public string PartitionKeyRangeId { get; }
public string PartitionKeyRangeId { get; private set; }

public BatchAsyncBatcher CurrentBatcher { get; set; }

Expand Down Expand Up @@ -74,6 +74,11 @@ public void Fail(
this.Dispose();
}

public void ReRouteOperation(string newPartitionKeyRangeId)
{
this.PartitionKeyRangeId = newPartitionKeyRangeId;
}

public void Dispose()
{
this.CurrentBatcher = null;
Expand Down
81 changes: 47 additions & 34 deletions Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

/// <summary>
Expand All @@ -17,77 +18,89 @@ namespace Microsoft.Azure.Cosmos
/// <see cref="ItemBatchOperationContext"/>
internal sealed class BulkPartitionKeyRangeGoneRetryPolicy : IDocumentClientRetryPolicy
{
private const int MaxRetries = 1;

private readonly IDocumentClientRetryPolicy nextRetryPolicy;
private readonly ContainerInternal container;

private int retriesAttempted;

public BulkPartitionKeyRangeGoneRetryPolicy(IDocumentClientRetryPolicy nextRetryPolicy)
public BulkPartitionKeyRangeGoneRetryPolicy(
ContainerInternal container,
IDocumentClientRetryPolicy nextRetryPolicy)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.nextRetryPolicy = nextRetryPolicy;
}

public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
DocumentClientException clientException = exception as DocumentClientException;

ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(
clientException?.StatusCode,
clientException?.GetSubStatus(),
clientException?.ResourceAddress);

if (shouldRetryResult != null)
if (exception is CosmosException clientException)
{
return Task.FromResult(shouldRetryResult);
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
clientException.StatusCode,
(SubStatusCodes)clientException.SubStatusCode,
cancellationToken);

if (shouldRetryResult != null)
{
return shouldRetryResult;
}

if (this.nextRetryPolicy == null)
{
return ShouldRetryResult.NoRetry();
}
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
return await this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}

public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(cosmosResponseMessage?.StatusCode,
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode,
cosmosResponseMessage?.GetResourceAddress());
cancellationToken);
if (shouldRetryResult != null)
{
return Task.FromResult(shouldRetryResult);
return shouldRetryResult;
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
return ShouldRetryResult.NoRetry();
}

return this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
return await this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}

public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.nextRetryPolicy.OnBeforeSendRequest(request);
}

private ShouldRetryResult ShouldRetryInternal(
private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
HttpStatusCode? statusCode,
SubStatusCodes? subStatusCode,
string resourceIdOrFullName)
CancellationToken cancellationToken)
{
if (statusCode == HttpStatusCode.Gone
&& (subStatusCode == SubStatusCodes.PartitionKeyRangeGone || subStatusCode == SubStatusCodes.NameCacheIsStale)
&& this.retriesAttempted < MaxRetries)
if (statusCode == HttpStatusCode.Gone)
{
this.retriesAttempted++;
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
if (subStatusCode == SubStatusCodes.PartitionKeyRangeGone
|| subStatusCode == SubStatusCodes.CompletingSplit
|| subStatusCode == SubStatusCodes.CompletingPartitionMigration)
{
PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
string containerRid = await this.container.GetCachedRIDAsync(forceRefresh: false, cancellationToken: cancellationToken);
await partitionKeyRangeCache.TryGetOverlappingRangesAsync(containerRid, FeedRangeEpk.FullRange.Range, forceRefresh: true);
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}

if (subStatusCode == SubStatusCodes.NameCacheIsStale)
{
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}
}

return null;
Expand Down
Loading

0 comments on commit 535d197

Please sign in to comment.