From 1cffd388f03ef3fb5f77df1fb8e56b2f1e50917f Mon Sep 17 00:00:00 2001 From: John Bowen Date: Thu, 13 Jul 2023 08:57:33 -0700 Subject: [PATCH] Improving record level error reporting for Cosmos --- .../CosmosDataSinkExtension.cs | 91 +++++++++++++------ 1 file changed, 62 insertions(+), 29 deletions(-) diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs index d944d0d..abb5fab 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs @@ -1,8 +1,8 @@ using System.ComponentModel.Composition; using System.Diagnostics; using System.Dynamic; +using System.Net; using System.Text; -using System.Text.RegularExpressions; using Cosmos.DataTransfer.Interfaces; using Microsoft.Azure.Cosmos; using Microsoft.Extensions.Configuration; @@ -81,6 +81,7 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati await CosmosExtensionServices.VerifyContainerAccess(container, settings.Container, logger, cancellationToken); int addedCount = 0; + int inputCount = 0; var timer = Stopwatch.StartNew(); void ReportCount(int i) @@ -100,15 +101,17 @@ void ReportCount(int i) var addTasks = batch.Select(item => AddItemAsync(container, item, settings.PartitionKeyPath ?? settings.PartitionKeyPaths?.FirstOrDefault(), settings.WriteMode, retry, logger, cancellationToken)).ToList(); var results = await Task.WhenAll(addTasks); - ReportCount(results.Sum()); + ReportCount(results.Sum(i => i.ItemCount)); + inputCount += results.Length; } - logger.LogInformation("Added {AddedCount} total records in {TotalSeconds}s", addedCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}"); - } + if (addedCount != inputCount) + { + logger.LogWarning("Added {AddedCount} of {TotalCount} total records in {TotalSeconds}s", addedCount, inputCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}"); + throw new Exception($"Only {addedCount} of {inputCount} records were added to Cosmos"); + } - private static string StripSpecialChars(string displayName) - { - return Regex.Replace(displayName, "[^\\w]", "", RegexOptions.Compiled); + logger.LogInformation("Added {AddedCount} total records in {TotalSeconds}s", addedCount, $"{timer.ElapsedMilliseconds / 1000.0:F2}"); } private static AsyncRetryPolicy GetRetryPolicy(int maxRetryCount, int initialRetryDuration) @@ -124,44 +127,68 @@ private static AsyncRetryPolicy GetRetryPolicy(int maxRetryCount, int initialRet return retryPolicy; } - private static Task AddItemAsync(Container container, ExpandoObject item, string? partitionKeyPath, DataWriteMode mode, AsyncRetryPolicy retryPolicy, ILogger logger, CancellationToken cancellationToken) + private static Task AddItemAsync(Container container, ExpandoObject item, string? partitionKeyPath, DataWriteMode mode, AsyncRetryPolicy retryPolicy, ILogger logger, CancellationToken cancellationToken) { - logger.LogTrace("Adding item {Id}", GetPropertyValue(item, "id")); - var task = retryPolicy.ExecuteAsync(() => - { - switch (mode) - { - case DataWriteMode.InsertStream: - ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath)); - return container.CreateItemStreamAsync(CreateItemStream(item), new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken); - case DataWriteMode.Insert: - return container.CreateItemAsync(item, cancellationToken: cancellationToken); - case DataWriteMode.UpsertStream: - ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath)); - return container.UpsertItemStreamAsync(CreateItemStream(item), new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken); - case DataWriteMode.Upsert: - return container.UpsertItemAsync(item, cancellationToken: cancellationToken); - } + string? id = GetPropertyValue(item, "id"); + logger.LogTrace("Adding item {Id}", id); - throw new ArgumentOutOfRangeException(nameof(mode), $"Invalid data write mode specified: {mode}"); - }) + var task = retryPolicy.ExecuteAsync(() => PopulateItem(container, item, partitionKeyPath, mode, id, cancellationToken)) .ContinueWith(t => { - if (t.IsCompletedSuccessfully) + bool requestSucceeded = t.Result.IsSuccess; + if (t.IsCompletedSuccessfully && requestSucceeded) { - return 1; + return t.Result; } if (t.IsFaulted) { logger.LogWarning(t.Exception, "Error adding record: {ErrorMessage}", t.Exception?.Message); } + else if (!requestSucceeded) + { + logger.LogWarning(t.Exception, "Error adding record {Id}: {ErrorMessage}", t.Result.Id, t.Result.StatusCode); + return t.Result; + } - return 0; + return new ItemResult(null, HttpStatusCode.InternalServerError); }, cancellationToken); return task; } + private static async Task PopulateItem(Container container, ExpandoObject item, string? partitionKeyPath, DataWriteMode mode, string? itemId, CancellationToken cancellationToken) + { + HttpStatusCode? statusCode = null; + switch (mode) + { + case DataWriteMode.InsertStream: + ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath)); + var insertMessage = await container.CreateItemStreamAsync(CreateItemStream(item), new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken); + statusCode = insertMessage.StatusCode; + break; + case DataWriteMode.Insert: + var insertResponse = await container.CreateItemAsync(item, cancellationToken: cancellationToken); + statusCode = insertResponse.StatusCode; + break; + case DataWriteMode.UpsertStream: + ArgumentNullException.ThrowIfNull(partitionKeyPath, nameof(partitionKeyPath)); + var upsertMessage = await container.UpsertItemStreamAsync(CreateItemStream(item), new PartitionKey(GetPropertyValue(item, partitionKeyPath.TrimStart('/'))), cancellationToken: cancellationToken); + statusCode = upsertMessage.StatusCode; + break; + case DataWriteMode.Upsert: + var upsertResponse = await container.UpsertItemAsync(item, cancellationToken: cancellationToken); + statusCode = upsertResponse.StatusCode; + break; + } + + if (statusCode == null) + { + throw new ArgumentOutOfRangeException(nameof(mode), $"Invalid data write mode specified: {mode}"); + } + + return new ItemResult(itemId, statusCode.Value); + } + private static MemoryStream CreateItemStream(ExpandoObject item) { var json = JsonConvert.SerializeObject(item); @@ -177,5 +204,11 @@ public IEnumerable GetSettings() { yield return new CosmosSinkSettings(); } + + public record ItemResult(string? Id, HttpStatusCode StatusCode) + { + public bool IsSuccess => StatusCode is HttpStatusCode.OK or HttpStatusCode.Created; + public int ItemCount => IsSuccess ? 1 : 0; + } } } \ No newline at end of file