diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
index 75e23c831f..d8e6b54008 100644
--- a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
+++ b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
@@ -26,7 +26,6 @@ namespace Microsoft.Azure.Cosmos
///
internal class BatchAsyncContainerExecutor : IDisposable
{
- private const int DefaultDispatchTimerInSeconds = 1;
private const int TimerWheelBucketCount = 20;
private static readonly TimeSpan TimerWheelResolution = TimeSpan.FromMilliseconds(50);
@@ -53,11 +52,6 @@ public BatchAsyncContainerExecutor(
int maxServerRequestOperationCount,
int maxServerRequestBodyLength)
{
- if (cosmosContainer == null)
- {
- throw new ArgumentNullException(nameof(cosmosContainer));
- }
-
if (maxServerRequestOperationCount < 1)
{
throw new ArgumentOutOfRangeException(nameof(maxServerRequestOperationCount));
@@ -68,7 +62,7 @@ public BatchAsyncContainerExecutor(
throw new ArgumentOutOfRangeException(nameof(maxServerRequestBodyLength));
}
- this.cosmosContainer = cosmosContainer;
+ this.cosmosContainer = cosmosContainer ?? throw new ArgumentNullException(nameof(cosmosContainer));
this.cosmosClientContext = cosmosClientContext;
this.maxServerRequestBodyLength = maxServerRequestBodyLength;
this.maxServerRequestOperationCount = maxServerRequestOperationCount;
@@ -90,7 +84,7 @@ public virtual async Task AddAsync(
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
- ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions));
+ ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.OperationTask;
@@ -137,10 +131,13 @@ internal virtual async Task ValidateOperationAsync(
await operation.MaterializeResourceAsync(this.cosmosClientContext.SerializerCore, cancellationToken);
}
- private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
+ private static IDocumentClientRetryPolicy GetRetryPolicy(
+ ContainerInternal containerInternal,
+ RetryOptions retryOptions)
{
return new BulkPartitionKeyRangeGoneRetryPolicy(
- new ResourceThrottleRetryPolicy(
+ containerInternal,
+ new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
retryOptions.MaxRetryWaitTimeInSeconds));
}
@@ -186,6 +183,7 @@ private async Task ReBatchAsync(
CancellationToken cancellationToken)
{
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
+ operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
streamer.Add(operation);
}
diff --git a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
index f384b04497..eabf539789 100644
--- a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
+++ b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
@@ -15,7 +15,7 @@ namespace Microsoft.Azure.Cosmos
///
internal class ItemBatchOperationContext : IDisposable
{
- public string PartitionKeyRangeId { get; }
+ public string PartitionKeyRangeId { get; private set; }
public BatchAsyncBatcher CurrentBatcher { get; set; }
@@ -74,6 +74,11 @@ public void Fail(
this.Dispose();
}
+ public void ReRouteOperation(string newPartitionKeyRangeId)
+ {
+ this.PartitionKeyRangeId = newPartitionKeyRangeId;
+ }
+
public void Dispose()
{
this.CurrentBatcher = null;
diff --git a/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs b/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs
index 6b55f2bdbc..0191c13e52 100644
--- a/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs
+++ b/Microsoft.Azure.Cosmos/src/BulkPartitionKeyRangeGoneRetryPolicy.cs
@@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos
using System.Net;
using System.Threading;
using System.Threading.Tasks;
+ using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
///
@@ -17,57 +18,61 @@ namespace Microsoft.Azure.Cosmos
///
internal sealed class BulkPartitionKeyRangeGoneRetryPolicy : IDocumentClientRetryPolicy
{
- private const int MaxRetries = 1;
-
private readonly IDocumentClientRetryPolicy nextRetryPolicy;
+ private readonly ContainerInternal container;
- private int retriesAttempted;
-
- public BulkPartitionKeyRangeGoneRetryPolicy(IDocumentClientRetryPolicy nextRetryPolicy)
+ public BulkPartitionKeyRangeGoneRetryPolicy(
+ ContainerInternal container,
+ IDocumentClientRetryPolicy nextRetryPolicy)
{
+ this.container = container ?? throw new ArgumentNullException(nameof(container));
this.nextRetryPolicy = nextRetryPolicy;
}
- public Task ShouldRetryAsync(
+ public async Task ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
- DocumentClientException clientException = exception as DocumentClientException;
-
- ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(
- clientException?.StatusCode,
- clientException?.GetSubStatus());
-
- if (shouldRetryResult != null)
+ if (exception is CosmosException clientException)
{
- return Task.FromResult(shouldRetryResult);
+ ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
+ clientException.StatusCode,
+ (SubStatusCodes)clientException.SubStatusCode,
+ cancellationToken);
+
+ if (shouldRetryResult != null)
+ {
+ return shouldRetryResult;
+ }
+
+ if (this.nextRetryPolicy == null)
+ {
+ return ShouldRetryResult.NoRetry();
+ }
}
- if (this.nextRetryPolicy == null)
- {
- return Task.FromResult(ShouldRetryResult.NoRetry());
- }
-
- return this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
+ return await this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}
- public Task ShouldRetryAsync(
+ public async Task ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
- ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(cosmosResponseMessage?.StatusCode,
- cosmosResponseMessage?.Headers.SubStatusCode);
+ ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
+ cosmosResponseMessage?.StatusCode,
+ cosmosResponseMessage?.Headers.SubStatusCode,
+ cancellationToken);
if (shouldRetryResult != null)
{
- return Task.FromResult(shouldRetryResult);
+ return shouldRetryResult;
}
if (this.nextRetryPolicy == null)
{
- return Task.FromResult(ShouldRetryResult.NoRetry());
+ return ShouldRetryResult.NoRetry();
}
- return this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
+ return await this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}
public void OnBeforeSendRequest(DocumentServiceRequest request)
@@ -75,19 +80,27 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)
this.nextRetryPolicy.OnBeforeSendRequest(request);
}
- private ShouldRetryResult ShouldRetryInternal(
+ private async Task ShouldRetryInternalAsync(
HttpStatusCode? statusCode,
- SubStatusCodes? subStatusCode)
+ SubStatusCodes? subStatusCode,
+ CancellationToken cancellationToken)
{
- if (statusCode == HttpStatusCode.Gone
- && (subStatusCode == SubStatusCodes.PartitionKeyRangeGone
- || subStatusCode == SubStatusCodes.NameCacheIsStale
- || subStatusCode == SubStatusCodes.CompletingSplit
- || subStatusCode == SubStatusCodes.CompletingPartitionMigration)
- && this.retriesAttempted < MaxRetries)
+ if (statusCode == HttpStatusCode.Gone)
{
- this.retriesAttempted++;
- return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
+ if (subStatusCode == SubStatusCodes.PartitionKeyRangeGone
+ || subStatusCode == SubStatusCodes.CompletingSplit
+ || subStatusCode == SubStatusCodes.CompletingPartitionMigration)
+ {
+ PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
+ string containerRid = await this.container.GetCachedRIDAsync(forceRefresh: false, cancellationToken: cancellationToken);
+ await partitionKeyRangeCache.TryGetOverlappingRangesAsync(containerRid, FeedRangeEpk.FullRange.Range, forceRefresh: true);
+ return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
+ }
+
+ if (subStatusCode == SubStatusCodes.NameCacheIsStale)
+ {
+ return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
+ }
}
return null;
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs
index 5b6a032def..e0e85b85a2 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs
@@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.Tests
using System.Net;
using System.Threading;
using System.Threading.Tasks;
+ using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
@@ -448,9 +449,11 @@ public async Task CannotAddToDispatchedBatch()
public async Task RetrierGetsCalledOnSplit()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
ItemBatchOperation operation1 = this.CreateItemBatchOperation();
@@ -475,9 +478,11 @@ public async Task RetrierGetsCalledOnSplit()
public async Task RetrierGetsCalledOnCompletingSplit()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
ItemBatchOperation operation1 = this.CreateItemBatchOperation();
@@ -502,9 +507,11 @@ public async Task RetrierGetsCalledOnCompletingSplit()
public async Task RetrierGetsCalledOnCompletingPartitionMigration()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
ItemBatchOperation operation1 = this.CreateItemBatchOperation();
@@ -545,6 +552,16 @@ public async Task RetrierGetsCalledOnOverFlow()
retryDelegate.Verify(a => a(It.IsAny(), It.IsAny()), Times.Once);
}
+ private static ContainerInternal GetSplitEnabledContainer()
+ {
+ Mock container = new Mock();
+ container.Setup(c => c.GetCachedRIDAsync(It.IsAny(), It.IsAny())).ReturnsAsync(Guid.NewGuid().ToString());
+ Mock context = new Mock();
+ container.Setup(c => c.ClientContext).Returns(context.Object);
+ context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection());
+ return container.Object;
+ }
+
private class BatchAsyncBatcherThatOverflows : BatchAsyncBatcher
{
public BatchAsyncBatcherThatOverflows(
@@ -565,5 +582,28 @@ internal override async Task>(serverRequest, new ArraySegment(serverRequest.Operations.ToArray(), 1, 1));
}
}
+
+ private class ClientWithSplitDetection : MockDocumentClient
+ {
+ private readonly Mock partitionKeyRangeCache;
+
+ public ClientWithSplitDetection()
+ {
+ this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null);
+ this.partitionKeyRangeCache.Setup(
+ m => m.TryGetOverlappingRangesAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.Is(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails
+ )
+ ).Returns((string collectionRid, Documents.Routing.Range range, bool forceRefresh) => Task.FromResult>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh)));
+ }
+
+ internal override Task GetPartitionKeyRangeCacheAsync()
+ {
+ return Task.FromResult(this.partitionKeyRangeCache.Object);
+ }
+
+ }
}
}
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs
index d204998afe..7b444f7363 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs
@@ -51,6 +51,11 @@ public async Task RetryOnSplit()
Mock mockContainer = new Mock();
mockContainer.Setup(x => x.LinkUri).Returns(link);
mockContainer.Setup(x => x.GetPartitionKeyDefinitionAsync(It.IsAny())).Returns(Task.FromResult(new PartitionKeyDefinition() { Paths = new Collection() { "/id" } }));
+ mockContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny(), It.IsAny())).ReturnsAsync(Guid.NewGuid().ToString());
+ Mock context = new Mock();
+ mockContainer.Setup(c => c.ClientContext).Returns(context.Object);
+ context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection());
+
CollectionRoutingMap routingMap = CollectionRoutingMap.TryCreateCompleteRoutingMap(
new[]
@@ -449,5 +454,28 @@ private class MyDocument
public bool Updated { get; set; }
}
+
+ private class ClientWithSplitDetection : MockDocumentClient
+ {
+ private readonly Mock partitionKeyRangeCache;
+
+ public ClientWithSplitDetection()
+ {
+ this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null);
+ this.partitionKeyRangeCache.Setup(
+ m => m.TryGetOverlappingRangesAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.Is(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails
+ )
+ ).Returns((string collectionRid, Documents.Routing.Range range, bool forceRefresh) => Task.FromResult>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh)));
+ }
+
+ internal override Task GetPartitionKeyRangeCacheAsync()
+ {
+ return Task.FromResult(this.partitionKeyRangeCache.Object);
+ }
+
+ }
}
}
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs
index 40da9a2f34..989506ff6d 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs
@@ -5,11 +5,14 @@
namespace Microsoft.Azure.Cosmos.Tests
{
using System;
+ using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
+ using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+ using Moq;
[TestClass]
public class BatchAsyncOperationContextTests
@@ -92,6 +95,7 @@ public async Task ShouldRetry_NoPolicy()
public async Task ShouldRetry_WithPolicy_OnSuccess()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ Mock.Of(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.OK);
ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null);
@@ -104,6 +108,7 @@ public async Task ShouldRetry_WithPolicy_OnSuccess()
public async Task ShouldRetry_WithPolicy_On429()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ Mock.Of(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult((HttpStatusCode)StatusCodes.TooManyRequests);
ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null);
@@ -116,6 +121,7 @@ public async Task ShouldRetry_WithPolicy_On429()
public async Task ShouldRetry_WithPolicy_OnSplit()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.PartitionKeyRangeGone };
ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null);
@@ -128,6 +134,7 @@ public async Task ShouldRetry_WithPolicy_OnSplit()
public async Task ShouldRetry_WithPolicy_OnCompletingSplit()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingSplit };
ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null);
@@ -140,6 +147,7 @@ public async Task ShouldRetry_WithPolicy_OnCompletingSplit()
public async Task ShouldRetry_WithPolicy_OnCompletingPartitionMigration()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingPartitionMigration };
ItemBatchOperation operation = new ItemBatchOperation(OperationType.Create, 0, Cosmos.PartitionKey.Null);
@@ -147,5 +155,38 @@ public async Task ShouldRetry_WithPolicy_OnCompletingPartitionMigration()
ShouldRetryResult shouldRetryResult = await operation.Context.ShouldRetryAsync(result, default);
Assert.IsTrue(shouldRetryResult.ShouldRetry);
}
+
+ private static ContainerInternal GetSplitEnabledContainer()
+ {
+ Mock container = new Mock();
+ container.Setup(c => c.GetCachedRIDAsync(It.IsAny(), It.IsAny())).ReturnsAsync(Guid.NewGuid().ToString());
+ Mock context = new Mock();
+ container.Setup(c => c.ClientContext).Returns(context.Object);
+ context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection());
+ return container.Object;
+ }
+
+ private class ClientWithSplitDetection : MockDocumentClient
+ {
+ private readonly Mock partitionKeyRangeCache;
+
+ public ClientWithSplitDetection()
+ {
+ this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null);
+ this.partitionKeyRangeCache.Setup(
+ m => m.TryGetOverlappingRangesAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.Is(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails
+ )
+ ).Returns((string collectionRid, Documents.Routing.Range range, bool forceRefresh) => Task.FromResult>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh)));
+ }
+
+ internal override Task GetPartitionKeyRangeCacheAsync()
+ {
+ return Task.FromResult(this.partitionKeyRangeCache.Object);
+ }
+
+ }
}
}
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs
index fc276b5551..0548932c00 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs
@@ -4,11 +4,15 @@
namespace Microsoft.Azure.Cosmos.Tests
{
+ using System;
+ using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
+ using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+ using Moq;
[TestClass]
public class BulkPartitionKeyRangeGoneRetryPolicyTests
@@ -17,6 +21,7 @@ public class BulkPartitionKeyRangeGoneRetryPolicyTests
public async Task NotRetryOnSuccess()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ Mock.Of(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.OK);
@@ -28,6 +33,7 @@ public async Task NotRetryOnSuccess()
public async Task RetriesOn429()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ Mock.Of(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult((HttpStatusCode)StatusCodes.TooManyRequests);
@@ -39,6 +45,7 @@ public async Task RetriesOn429()
public async Task RetriesOnSplits()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.PartitionKeyRangeGone };
@@ -50,6 +57,7 @@ public async Task RetriesOnSplits()
public async Task RetriesOnCompletingSplits()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingSplit };
@@ -61,11 +69,45 @@ public async Task RetriesOnCompletingSplits()
public async Task RetriesOnCompletingPartitionMigrationSplits()
{
IDocumentClientRetryPolicy retryPolicy = new BulkPartitionKeyRangeGoneRetryPolicy(
+ GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));
TransactionalBatchOperationResult result = new TransactionalBatchOperationResult(HttpStatusCode.Gone) { SubStatusCode = SubStatusCodes.CompletingPartitionMigration };
ShouldRetryResult shouldRetryResult = await retryPolicy.ShouldRetryAsync(result.ToResponseMessage(), default);
Assert.IsTrue(shouldRetryResult.ShouldRetry);
}
+
+ private static ContainerInternal GetSplitEnabledContainer()
+ {
+ Mock container = new Mock();
+ container.Setup(c => c.GetCachedRIDAsync(It.IsAny(), It.IsAny())).ReturnsAsync(Guid.NewGuid().ToString());
+ Mock context = new Mock();
+ container.Setup(c => c.ClientContext).Returns(context.Object);
+ context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection());
+ return container.Object;
+ }
+
+ private class ClientWithSplitDetection : MockDocumentClient
+ {
+ private readonly Mock partitionKeyRangeCache;
+
+ public ClientWithSplitDetection()
+ {
+ this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null);
+ this.partitionKeyRangeCache.Setup(
+ m => m.TryGetOverlappingRangesAsync(
+ It.IsAny(),
+ It.IsAny>(),
+ It.Is(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails
+ )
+ ).Returns((string collectionRid, Documents.Routing.Range range, bool forceRefresh) => Task.FromResult>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh)));
+ }
+
+ internal override Task GetPartitionKeyRangeCacheAsync()
+ {
+ return Task.FromResult(this.partitionKeyRangeCache.Object);
+ }
+
+ }
}
}