Skip to content

Commit

Permalink
PartitionKeyDelete: Adds API to delete all items in a partition key (#…
Browse files Browse the repository at this point in the history
…1490)

Added DeleteItemsInPartitionKeyAsync API
Changes in GetStoreProxy in DocumentClient.cs, GetHttpMethod in RequestInvokerHandler.cs, PrepareRequestMessageAsync in GatewayStoreClient.cs for GatewayMode of PkDeletePurger to work.
  • Loading branch information
Shaleen-Kalsi committed Jan 7, 2021
1 parent ce2bdab commit 0e338b7
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 4 deletions.
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6314,7 +6314,8 @@ internal IStoreModel GetStoreProxy(DocumentServiceRequest request)
(resourceType.IsScript() && operationType != OperationType.ExecuteJavaScript) ||
resourceType == ResourceType.PartitionKeyRange ||
resourceType == ResourceType.Snapshot ||
resourceType == ResourceType.ClientEncryptionKey)
resourceType == ResourceType.ClientEncryptionKey ||
(resourceType == ResourceType.PartitionKey && operationType == OperationType.Delete))
{
return this.GatewayStoreModel;
}
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/GatewayStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ private async ValueTask<HttpRequestMessage> PrepareRequestMessageAsync(
request.OperationType == OperationType.SqlQuery ||
request.OperationType == OperationType.Batch ||
request.OperationType == OperationType.ExecuteJavaScript ||
request.OperationType == OperationType.QueryPlan)
request.OperationType == OperationType.QueryPlan ||
(request.ResourceType == ResourceType.PartitionKey && request.OperationType == OperationType.Delete))
{
httpMethod = HttpMethod.Post;
}
Expand Down
6 changes: 4 additions & 2 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public virtual async Task<ResponseMessage> SendAsync(

try
{
HttpMethod method = RequestInvokerHandler.GetHttpMethod(operationType);
HttpMethod method = RequestInvokerHandler.GetHttpMethod(resourceType, operationType);
RequestMessage request = new RequestMessage(
method,
resourceUriString,
Expand Down Expand Up @@ -284,6 +284,7 @@ public virtual async Task<ResponseMessage> SendAsync(
}

internal static HttpMethod GetHttpMethod(
ResourceType resourceType,
OperationType operationType)
{
if (operationType == OperationType.Create ||
Expand All @@ -293,7 +294,8 @@ internal static HttpMethod GetHttpMethod(
operationType == OperationType.QueryPlan ||
operationType == OperationType.Batch ||
operationType == OperationType.ExecuteJavaScript ||
operationType == OperationType.CompleteUserTransaction)
operationType == OperationType.CompleteUserTransaction ||
(resourceType == ResourceType.PartitionKey && operationType == OperationType.Delete))
{
return HttpMethod.Post;
}
Expand Down
18 changes: 18 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,24 @@ public abstract ChangeFeedEstimator GetChangeFeedEstimator(
/// <returns>A new instance of <see cref="TransactionalBatch"/>.</returns>
public abstract TransactionalBatch CreateTransactionalBatch(PartitionKey partitionKey);

#if INTERNAL
/// <summary>
/// Deletes all items in the Container with the specified <see cref="PartitionKey"/> value.
/// Starts an asynchronous Cosmos DB background operation which deletes all items in the Container with the specified value.
/// The asynchronous Cosmos DB background operation runs using a percentage of user RUs.
/// </summary>
/// <param name="partitionKey">The <see cref="PartitionKey"/> of the items to be deleted.</param>
/// <param name="requestOptions">(Optional) The options for the Partition Key Delete request.</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>
/// A <see cref="Task"/> containing a <see cref="ResponseMessage"/>.
/// </returns>
public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
Cosmos.PartitionKey partitionKey,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken));
#endif

#if PREVIEW
/// <summary>
/// Obtains a list of <see cref="FeedRange"/> that can be used to parallelize Feed operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,40 @@ public override async Task<PartitionKey> GetPartitionKeyValueFromStreamAsync(
}
}

public Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
Cosmos.PartitionKey partitionKey,
CosmosDiagnosticsContext diagnosticsContext,
ITrace trace,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
Cosmos.PartitionKey? resultingPartitionKey;
if (requestOptions != null && requestOptions.IsEffectivePartitionKeyRouting)
{
resultingPartitionKey = null;
}
else
{
resultingPartitionKey = partitionKey;
}

ContainerCore.ValidatePartitionKey(resultingPartitionKey, requestOptions);

return this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.LinkUri,
resourceType: ResourceType.PartitionKey,
operationType: OperationType.Delete,
requestOptions: requestOptions,
cosmosContainerCore: this,
partitionKey: resultingPartitionKey,
itemId: null,
streamPayload: null,
requestEnricher: null,
trace: trace,
diagnosticsContext: diagnosticsContext,
cancellationToken: cancellationToken);
}

private static bool TryParseTokenListForElement(CosmosObject pathTraversal, IReadOnlyList<string> tokens, out CosmosElement result)
{
result = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,5 +454,16 @@ public override IAsyncEnumerable<TryCatch<ReadFeedPage>> GetReadFeedAsyncEnumera
{
return base.GetReadFeedAsyncEnumerable(state, requestOptions);
}

public override Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
Cosmos.PartitionKey partitionKey,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
{
return this.ClientContext.OperationHelperAsync(
nameof(DeleteAllItemsByPartitionKeyStreamAsync),
requestOptions,
(diagnostics, trace) => base.DeleteAllItemsByPartitionKeyStreamAsync(partitionKey, diagnostics, trace, requestOptions, cancellationToken));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public abstract Task<ItemResponse<T>> PatchItemAsync<T>(
IReadOnlyList<PatchOperation> patchOperations,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default);

public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
Cosmos.PartitionKey partitionKey,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken));
#endif

#if !PREVIEW
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,59 @@ await feedIterator.ReadNextAsync(this.cancellationToken))
Assert.AreEqual(itemIds.Count, 0);
}

[TestMethod]
public async Task PartitionKeyDeleteTest()
{
string pKString = "PK1";
string pKString2 = "PK2";
dynamic testItem1 = new
{
id = "item1",
status = pKString
};

dynamic testItem2 = new
{
id = "item2",
status = pKString
};

dynamic testItem3 = new
{
id = "item3",
status = pKString2
};

ContainerInternal containerInternal = (ContainerInternal)this.Container;
ItemResponse<dynamic> itemResponse = await this.Container.CreateItemAsync<dynamic>(testItem1);
ItemResponse<dynamic> itemResponse2 = await this.Container.CreateItemAsync<dynamic>(testItem2);
ItemResponse<dynamic> itemResponse3 = await this.Container.CreateItemAsync<dynamic>(testItem3);
Cosmos.PartitionKey partitionKey1 = new Cosmos.PartitionKey(pKString);
Cosmos.PartitionKey partitionKey2 = new Cosmos.PartitionKey(pKString2);
using (ResponseMessage pKDeleteResponse = await containerInternal.DeleteAllItemsByPartitionKeyStreamAsync(partitionKey1))
{
Assert.AreEqual(pKDeleteResponse.StatusCode, HttpStatusCode.OK);
}

using (ResponseMessage readResponse = await this.Container.ReadItemStreamAsync("item1", partitionKey1))
{
Assert.AreEqual(readResponse.StatusCode, HttpStatusCode.NotFound);
Assert.AreEqual(readResponse.Headers.SubStatusCode, SubStatusCodes.Unknown);
}

using (ResponseMessage readResponse = await this.Container.ReadItemStreamAsync("item2", partitionKey1))
{
Assert.AreEqual(readResponse.StatusCode, HttpStatusCode.NotFound);
Assert.AreEqual(readResponse.Headers.SubStatusCode, SubStatusCodes.Unknown);
}

//verify item with the other Partition Key is not deleted
using (ResponseMessage readResponse = await this.Container.ReadItemStreamAsync("item3", partitionKey2))
{
Assert.AreEqual(readResponse.StatusCode, HttpStatusCode.OK);
}
}

[TestMethod]
public async Task ItemCustomSerialzierTest()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,17 @@ public async Task AllowBatchingRequestsSendsToExecutor_Patch()
mockedExecutor.Verify(c => c.AddAsync(It.IsAny<ItemBatchOperation>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
}

[TestMethod]
public async Task PartitionKeyDeleteUnitTest()
{
dynamic item = new
{
id = Guid.NewGuid().ToString(),
pk = "FF627B77-568E-4541-A47E-041EAC10E46F",
};
await this.VerifyPartitionKeyDeleteOperation(new Cosmos.PartitionKey(item.pk), "[\"FF627B77-568E-4541-A47E-041EAC10E46F\"]");
}

[TestMethod]
public async Task TestNestedPartitionKeyValueFromStreamAsync()
{
Expand Down Expand Up @@ -870,6 +881,42 @@ private async Task VerifyItemOperations(
Assert.AreEqual(10, testHandlerHitCount, "A stream operation did not make it to the handler");
}

private async Task VerifyPartitionKeyDeleteOperation(
Cosmos.PartitionKey partitionKey,
string partitionKeySerialized,
RequestOptions requestOptions = null)
{
ResponseMessage response = null;
HttpStatusCode httpStatusCode = HttpStatusCode.OK;
int testHandlerHitCount = 0;
TestHandler testHandler = new TestHandler((request, cancellationToken) =>
{
Assert.IsTrue(request.RequestUri.OriginalString.StartsWith(@"dbs/testdb/colls/testcontainer"));
Assert.AreEqual(requestOptions, request.RequestOptions);
Assert.AreEqual(ResourceType.PartitionKey, request.ResourceType);
Assert.IsNotNull(request.Headers.PartitionKey);
Assert.AreEqual(partitionKeySerialized, request.Headers.PartitionKey);
testHandlerHitCount++;
response = new ResponseMessage(httpStatusCode, request, errorMessage: null);
response.Content = request.Content;
return Task.FromResult(response);
});

CosmosClient client = MockCosmosUtil.CreateMockCosmosClient(
(builder) => builder.AddCustomHandlers(testHandler));

Container container = client.GetDatabase("testdb")
.GetContainer("testcontainer");

ContainerInternal containerInternal = (ContainerInternal)container;
ResponseMessage responseMessage = await containerInternal.DeleteAllItemsByPartitionKeyStreamAsync(
partitionKey: partitionKey,
requestOptions: requestOptions);
Assert.IsNotNull(responseMessage);
Assert.AreEqual(httpStatusCode, responseMessage.StatusCode);
Assert.AreEqual(1, testHandlerHitCount, "The operation did not make it to the handler");
}

private Mock<BatchAsyncContainerExecutor> GetMockedBatchExcecutor()
{
Mock<BatchAsyncContainerExecutor> mockedExecutor = new Mock<BatchAsyncContainerExecutor>();
Expand Down

0 comments on commit 0e338b7

Please sign in to comment.