Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for issue: CosmosDB to CosmosDB doesn't migrate "Id" field #100 #116

Merged
merged 3 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Cosmos.DataTransfer.Interfaces;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cosmos.DataTransfer.CosmosExtension.UnitTests
{
Expand Down Expand Up @@ -54,5 +55,82 @@ public void BuildDynamicObjectTree_WithNestedArrays_WorksCorrectly()

Assert.AreEqual("sub2-1", secondSubArray[0].id);
}

[TestMethod]
public void BuildDynamicObjectTree_WithAnyCaseIds_UsesSourceIdValue()
{
var numeric = Random.Shared.Next();
var lower = Guid.NewGuid().ToString();
var upper = Guid.NewGuid().ToString();
var mixed = Guid.NewGuid().ToString();
var reversed = Guid.NewGuid().ToString();
var item = new CosmosDictionaryDataItem(new Dictionary<string, object?>()
{
{ "id", numeric },
});

dynamic obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!;
Assert.AreEqual(numeric.ToString(), obj.id);

item = new CosmosDictionaryDataItem(new Dictionary<string, object?>()
{
{ "id", lower },
});

obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!;
Assert.AreEqual(lower, obj.id);

item = new CosmosDictionaryDataItem(new Dictionary<string, object?>()
{
{ "ID", upper },
});
obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!;
Assert.AreEqual(upper, obj.id);

item = new CosmosDictionaryDataItem(new Dictionary<string, object?>()
{
{ "Id", mixed },
});
obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!;
Assert.AreEqual(mixed, obj.id);

item = new CosmosDictionaryDataItem(new Dictionary<string, object?>()
{
{ "iD", reversed },
});
obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: false)!;
Assert.AreEqual(reversed, obj.id);
}

[TestMethod]
public void BuildDynamicObjectTree_WithPreservedMixedCaseIds_PassesThroughSourceValues()
{
var id = Random.Shared.Next();
var upper = Guid.NewGuid().ToString();
var mixed = Guid.NewGuid().ToString();
var item = new CosmosDictionaryDataItem(new Dictionary<string, object?>()
{
{ "id", id },
{ "ID", upper },
{ "Id", mixed }
});

dynamic obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: true)!;
Assert.AreEqual(id.ToString(), obj.id);
Assert.AreEqual(upper, obj.ID);
Assert.AreEqual(mixed, obj.Id);

item = new CosmosDictionaryDataItem(new Dictionary<string, object?>()
{
{ "ID", upper },
{ "Id", mixed }
});
obj = item.BuildDynamicObjectTree(requireStringId: true, preserveMixedCaseIds: true)!;
Assert.AreEqual(upper, obj.ID);
Assert.AreEqual(mixed, obj.Id);
string? cosmosId = obj.id;
Assert.IsNotNull(cosmosId);
Assert.IsFalse(string.IsNullOrWhiteSpace(cosmosId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ void ReportCount(int i)
}
}

var convertedObjects = dataItems.Select(di => di.BuildDynamicObjectTree(requireStringId: true, ignoreNullValues: settings.IgnoreNullValues)).Where(o => o != null).OfType<ExpandoObject>();
var convertedObjects = dataItems
.Select(di => di.BuildDynamicObjectTree(requireStringId: true, ignoreNullValues: settings.IgnoreNullValues, preserveMixedCaseIds: settings.PreserveMixedCaseIds))
.Where(o => o != null)
.OfType<ExpandoObject>();
var batches = convertedObjects.Buffer(settings.BatchSize);
var retry = GetRetryPolicy(settings.MaxRetryCount, settings.InitialRetryDurationMs);
await foreach (var batch in batches.WithCancellation(cancellationToken))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class CosmosSinkSettings : CosmosSettingsBase, IDataExtensionSettings
public bool UseAutoscaleForCreatedContainer { get; set; } = true;
public bool IsServerlessAccount { get; set; } = false;
public bool UseSharedThroughput { get; set; } = false;
public bool PreserveMixedCaseIds { get; set; } = false;
public DataWriteMode WriteMode { get; set; } = DataWriteMode.Insert;
public bool IgnoreNullValues { get; set; } = false;
public List<string>? PartitionKeyPaths { get; set; }
Expand Down
3 changes: 2 additions & 1 deletion Extensions/Cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Or with RBAC:
}
```

Sink requires an additional `PartitionKeyPath` parameter which is used when creating the container if it does not exist. To use hierarchical partition keys, instead use the `PartitionKeyPaths` setting to supply an array of up to 3 paths. It also supports an optional `RecreateContainer` parameter (`false` by default) to delete and then recreate the container to ensure only newly imported data is present. The optional `BatchSize` parameter (100 by default) sets the number of items to accumulate before inserting. `ConnectionMode` can be set to either `Gateway` (default) or `Direct` to control how the client connects to the CosmosDB service. For situations where a container is created as part of the transfer operation `CreatedContainerMaxThroughput` (in RUs) and `UseAutoscaleForCreatedContainer` provide the initial throughput settings which will be in effect when executing the transfer. To instead use shared throughput that has been provisioned at the database level, set the `UseSharedThroughput` parameter to `true`. The optional `WriteMode` parameter specifies the type of data write to use: `InsertStream`, `Insert`, `UpsertStream`, or `Upsert`. The `IsServerlessAccount` parameter specifies whether the target account uses Serverless instead of Provisioned throughput, which affects the way containers are created. Additional parameters allow changing the behavior of the Cosmos client appropriate to your environment.
Sink requires an additional `PartitionKeyPath` parameter which is used when creating the container if it does not exist. To use hierarchical partition keys, instead use the `PartitionKeyPaths` setting to supply an array of up to 3 paths. It also supports an optional `RecreateContainer` parameter (`false` by default) to delete and then recreate the container to ensure only newly imported data is present. The optional `BatchSize` parameter (100 by default) sets the number of items to accumulate before inserting. `ConnectionMode` can be set to either `Gateway` (default) or `Direct` to control how the client connects to the CosmosDB service. For situations where a container is created as part of the transfer operation `CreatedContainerMaxThroughput` (in RUs) and `UseAutoscaleForCreatedContainer` provide the initial throughput settings which will be in effect when executing the transfer. To instead use shared throughput that has been provisioned at the database level, set the `UseSharedThroughput` parameter to `true`. The optional `WriteMode` parameter specifies the type of data write to use: `InsertStream`, `Insert`, `UpsertStream`, or `Upsert`. The `IsServerlessAccount` parameter specifies whether the target account uses Serverless instead of Provisioned throughput, which affects the way containers are created. Additional parameters allow changing the behavior of the Cosmos client appropriate to your environment. The `PreserveMixedCaseIds` parameter (`false` by default) ignores differently cased `id` fields and writes them through without modification, while generating a separate lowercased `id` field as required by Cosmos.

### Sink

Expand All @@ -62,6 +62,7 @@ Sink requires an additional `PartitionKeyPath` parameter which is used when crea
"CreatedContainerMaxThroughput": 1000,
"UseAutoscaleForCreatedContainer": true,
"WriteMode": "InsertStream",
"PreserveMixedCaseIds": false,
"IsServerlessAccount": false,
"UseSharedThroughput": false
}
Expand Down
40 changes: 30 additions & 10 deletions Interfaces/Cosmos.DataTransfer.Interfaces/DataItemExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ public static class DataItemExtensions
/// </summary>
/// <param name="source"></param>
/// <param name="requireStringId">If true, adds a new GUID "id" field to any top level items where one is not already present.</param>
/// <param name="preserveMixedCaseIds">If true, disregards differently cased "id" fields for purposes of required "id" and passes them through.</param>
/// <returns>A dynamic object containing the entire data structure.</returns>
/// <remarks>The returned ExpandoObject can be used directly as an IDictionary.</remarks>
public static ExpandoObject? BuildDynamicObjectTree(this IDataItem? source, bool requireStringId = false, bool ignoreNullValues = false)
public static ExpandoObject? BuildDynamicObjectTree(this IDataItem? source, bool requireStringId = false, bool ignoreNullValues = false, bool preserveMixedCaseIds = false)
{
if (source == null)
{
Expand All @@ -20,20 +21,25 @@ public static class DataItemExtensions

var fields = source.GetFieldNames().ToList();
var item = new ExpandoObject();

/*
* If the item contains a lowercase id field, we can take it as is.
* If we have an uppercase Id or ID field, but no lowercase id, we will rename it to id.
* If we have an uppercase Id or ID field, but no lowercase id, we will rename it to id, unless `preserveMixedCaseIds` is set to true.
* If `preserveMixedCaseIds` is set to true, any differently cased "id" fields will be passed through as normal properties with no casing change and a separate "id" will be generated.
* Then it can be used i.e. as CosmosDB primary key, when `requireStringId` is set to true.
*/
var containsLowercaseIdField = fields.Contains("id", StringComparer.CurrentCulture);
var containsAnyIdField = fields.Contains("id", StringComparer.CurrentCultureIgnoreCase);
if (requireStringId && !containsAnyIdField)

if (requireStringId)
{
item.TryAdd("id", Guid.NewGuid().ToString());
bool mismatchedIdCasing = preserveMixedCaseIds && !containsLowercaseIdField;
if (!containsAnyIdField || mismatchedIdCasing)
{
item.TryAdd("id", Guid.NewGuid().ToString());
}
}

foreach (string field in fields)
{
object? value = source.GetValue(field);
Expand All @@ -43,10 +49,24 @@ public static class DataItemExtensions
}

var fieldName = field;
if (string.Equals(field, "id", StringComparison.CurrentCultureIgnoreCase) && requireStringId && !containsLowercaseIdField)
if (requireStringId && string.Equals(field, "id", StringComparison.CurrentCultureIgnoreCase))
{
value = value?.ToString();
fieldName = "id";
if (preserveMixedCaseIds)
{
if (string.Equals(field, "id", StringComparison.CurrentCulture))
{
value = value?.ToString();
}
}
else if (!containsLowercaseIdField)
{
value = value?.ToString();
fieldName = "id";
}
else
{
value = value?.ToString();
}
}
else if (value is IDataItem child)
{
Expand Down
Loading