Skip to content

Commit

Permalink
Ensure only a single action for a document is added to a batch (Azure…
Browse files Browse the repository at this point in the history
…#18469)

* Ensure only a single action for a document is added to a batch

* Add test to validate batch splitting behavior

* Add a test to verify case-sensitive nature of keys
  • Loading branch information
Mohit-Chakraborty authored and jongio committed Feb 9, 2021
1 parent fe1c76d commit 3a007af
Show file tree
Hide file tree
Showing 6 changed files with 738 additions and 3 deletions.
8 changes: 5 additions & 3 deletions sdk/search/Azure.Search.Documents/src/Batching/Publisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,14 @@ await SubmitBatchAsync(
// Returns whether the batch is now full.
bool FillBatchFromQueue(List<PublisherAction<T>> batch, Queue< PublisherAction<T>> queue)
{
// TODO: Consider tracking the keys in the batch and requiring
// them to be unique to avoid error alignment problems
HashSet<string> documentIdentifiers = new HashSet<string>(StringComparer.Ordinal);

while (queue.Count > 0)
{
if (batch.Count < BatchActionCount)
// Stop filling the batch if we run into an action for a document that is already in the batch.
// We want to keep the actions ordered and map any errors accurately to the documents,
// so we need to split the batch on encountering an action for a previously queued document.
if ((batch.Count < BatchActionCount) && documentIdentifiers.Add(queue.Peek().Key))
{
batch.Add(queue.Dequeue());
}
Expand Down
117 changes: 117 additions & 0 deletions sdk/search/Azure.Search.Documents/tests/Batching/BatchingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,123 @@ public async Task Behavior_BatchSize()
Assert.AreEqual(5, client.Submissions.Count);
}

[Test]
public async Task Behavior_SplitBatchByDocumentKey()
{
int numberOfDocuments = 5;

await using SearchResources resources = await SearchResources.CreateWithEmptyIndexAsync<SimpleDocument>(this);
BatchingSearchClient client = GetBatchingSearchClient(resources);

SimpleDocument[] data = new SimpleDocument[numberOfDocuments];
for (int i = 0; i < numberOfDocuments; i++)
{
data[i] = new SimpleDocument() { Id = $"{i}", Name = $"Document #{i}" };
}

// 'SimpleDocument' has 'Id' set as its key field.
// Set the Ids of 2 documents in the group to be the same.
// We expect the batch to be split at this index, even though the size of the set is smaller than the batch size.
data[3].Id = data[0].Id;

await using SearchIndexingBufferedSender<SimpleDocument> indexer =
client.CreateIndexingBufferedSender(
new SearchIndexingBufferedSenderOptions<SimpleDocument>()
{
// Set the expected batch action count to be larger than the number of documents in the set.
InitialBatchActionCount = numberOfDocuments + 1,
});

// Throw from every handler
int sent = 0, completed = 0;
indexer.ActionSent += e =>
{
sent++;
// Batch will be split at the 4th document.
// So, 3 documents will be sent before any are submitted, but 3 submissions will be made before the last 2 are sent
Assert.AreEqual((sent <= 3) ? 0 : 3, completed);
throw new InvalidOperationException("ActionSentAsync: Should not be seen!");
};

indexer.ActionCompleted += e =>
{
completed++;
// Batch will be split at the 4th document.
// So, 3 documents will be submitted after 3 are sent, and the last 2 submissions will be made after all 5 are sent
Assert.AreEqual((completed <= 3) ? 3 : 5, sent);
throw new InvalidOperationException("ActionCompletedAsync: Should not be seen!");
};

AssertNoFailures(indexer);
await indexer.UploadDocumentsAsync(data);
await indexer.FlushAsync();

Assert.AreEqual(5, sent);
Assert.AreEqual(5, completed);
}

[Test]
public async Task Behavior_SplitBatchByDocumentKeyIgnoreCaseDifferences()
{
int numberOfDocuments = 5;

await using SearchResources resources = await SearchResources.CreateWithEmptyIndexAsync<SimpleDocument>(this);
BatchingSearchClient client = GetBatchingSearchClient(resources);

// 'SimpleDocument' has 'Id' set as its key field.
// Set the Ids of 2 documents in the group to differ only in case from another 2.
// We expect the batch to NOT be split because keys are case-sensitive and the publisher should consider them all unique.
SimpleDocument[] data = new SimpleDocument[]
{
new SimpleDocument() { Id = "a", Name = "Document a" },
new SimpleDocument() { Id = "b", Name = "Document b" },
new SimpleDocument() { Id = "c", Name = "Document c" },
new SimpleDocument() { Id = "A", Name = "Document A" },
new SimpleDocument() { Id = "B", Name = "Document B" },
};

await using SearchIndexingBufferedSender<SimpleDocument> indexer =
client.CreateIndexingBufferedSender(
new SearchIndexingBufferedSenderOptions<SimpleDocument>()
{
// Make sure the expected batch action is larger than the number of documents in the set.
InitialBatchActionCount = numberOfDocuments + 1,
});

// Throw from every handler
int sent = 0, completed = 0;
indexer.ActionSent += e =>
{
sent++;
// Batch will not be split. So, no document will be submitted before all are sent.
Assert.AreEqual(0, completed);
throw new InvalidOperationException("ActionSentAsync: Should not be seen!");
};

indexer.ActionCompleted += e =>
{
completed++;
// Batch will not be split. So, all documents will be sent before any are submitted.
Assert.AreEqual(5, sent);
throw new InvalidOperationException("ActionCompletedAsync: Should not be seen!");
};

AssertNoFailures(indexer);
await indexer.UploadDocumentsAsync(data);
await indexer.FlushAsync();

Assert.AreEqual(5, sent);
Assert.AreEqual(5, completed);
}

[Test]
[TestCase(409)]
[TestCase(422)]
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3a007af

Please sign in to comment.