Skip to content

Commit

Permalink
Feature: MergeManyChangeSets with Parent Item Comparison (#750)
Browse files Browse the repository at this point in the history
Implementation of Parent Sorted MergeManyChangeSets
  • Loading branch information
dwcullop committed Nov 7, 2023
1 parent 0434568 commit 269828b
Show file tree
Hide file tree
Showing 11 changed files with 1,851 additions and 481 deletions.
324 changes: 65 additions & 259 deletions src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs

Large diffs are not rendered by default.

317 changes: 104 additions & 213 deletions src/DynamicData.Tests/Cache/MergeManyCacheChangeSetsFixture.cs

Large diffs are not rendered by default.

Large diffs are not rendered by default.

133 changes: 133 additions & 0 deletions src/DynamicData.Tests/Domain/Market.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using DynamicData.Kernel;
using DynamicData.Tests.Utilities;

namespace DynamicData.Tests.Domain;

internal interface IMarket
{
public string Name { get; }

public double Rating { get; set; }

public Guid Id { get; }

public IObservable<IChangeSet<MarketPrice, int>> LatestPrices { get; }
}

internal class Market : IMarket, IDisposable
{
private readonly ISourceCache<MarketPrice, int> _latestPrices = new SourceCache<MarketPrice, int>(p => p.ItemId);

public static IComparer<IMarket> RatingCompare { get; } = new RatingComparer();

private Market(string name, Guid id)
{
Name = name;
Id = id;
}

public Market(Market market) : this(market.Name, market.Id)
{
}

public Market(int name) : this($"Market #{name}", Guid.NewGuid())
{
}

public string Name { get; }

public Guid Id { get; }

public double Rating { get; set; }

public IObservable<IChangeSet<MarketPrice, int>> LatestPrices => _latestPrices.Connect();

public ISourceCache<MarketPrice, int> PricesCache => _latestPrices;

public MarketPrice CreatePrice(int itemId, decimal price) => new(itemId, price, Id);

public Market AddRandomIdPrices(Random r, int count, int minId, int maxId, Func<decimal> randPrices)
{
_latestPrices.AddOrUpdate(Enumerable.Range(0, int.MaxValue).Select(_ => r.Next(minId, maxId)).Distinct().Take(count).Select(id => CreatePrice(id, randPrices())));
return this;
}

public Market AddUniquePrices(int section, int count, int stride, Func<decimal> getPrice) => SetPrices(section * stride, section * stride + count, getPrice);

public Market RefreshPrice(int id, decimal newPrice) => this.With(_ =>
_latestPrices.Edit(updater => updater.Lookup(id).IfHasValue(cp =>
{
cp.Price = newPrice;
updater.Refresh(cp);
})));

public Market RefreshAllPrices(Func<int, decimal> getNewPrice) => this.With(_ =>
_latestPrices.Edit(updater => updater.Items.ForEach(cp =>
{
cp.Price = getNewPrice(cp.ItemId);
updater.Refresh(cp);
})));

public Market RefreshAllPrices(Func<decimal> getNewPrice) => RefreshAllPrices(_ => getNewPrice());

public Market RefreshAllPrices(decimal newPrice) => RefreshAllPrices(_ => newPrice);

public void RemoveAllPrices() => this.With(_ => _latestPrices.Clear());

public void RemovePrice(int itemId) => this.With(_ => _latestPrices.Remove(itemId));

public Market UpdateAllPrices(Func<int, decimal> getNewPrice) => this.With(_ =>
_latestPrices.Edit(updater => updater.AddOrUpdate(updater.Items.Select(cp => CreatePrice(cp.ItemId, getNewPrice(cp.ItemId))))));

public Market UpdateAllPrices(Func<decimal> getNewPrice) => UpdateAllPrices(_ => getNewPrice());

public Market UpdateAllPrices(decimal newPrice) => UpdateAllPrices(_ => newPrice);

public Market SetPrices(int minId, int maxId, Func<int, decimal> getPrice) => this.With(_ =>
_latestPrices.AddOrUpdate(Enumerable.Range(minId, maxId - minId).Select(id => CreatePrice(id, getPrice(id)))));

public Market SetPrices(int minId, int maxId, Func<decimal> getPrice) => SetPrices(minId, maxId, i => getPrice());

public Market SetPrices(int minId, int maxId, decimal newPrice) => SetPrices(minId, maxId, _ => newPrice);

public void Dispose() => _latestPrices.Dispose();

public override string ToString() => $"Market '{Name}' [{Id}] (Rating: {Rating})";

private class RatingComparer : IComparer<IMarket>
{
public int Compare([DisallowNull] IMarket x, [DisallowNull] IMarket y)
{
// Higher ratings go first
return y.Rating.CompareTo(x.Rating);
}
}
}


internal class FixedMarket : IMarket
{
public FixedMarket(Func<decimal> getPrice, int minId, int maxId, bool completable = true)
{
Id = Guid.NewGuid();
LatestPrices = Enumerable.Range(minId, maxId - minId)
.Select(id => new MarketPrice(id, getPrice(), Id))
.AsObservableChangeSet(cp => cp.ItemId, completable: completable);
}

public IObservable<IChangeSet<MarketPrice, int>> LatestPrices { get; }

public string Name => Id.ToString("B");

public double Rating { get; set; }

public Guid Id { get; }

public override string ToString() => $"Fixed Market '{Name}' (Rating: {Rating})";
}
82 changes: 82 additions & 0 deletions src/DynamicData.Tests/Domain/MarketPrice.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;

namespace DynamicData.Tests.Domain;

internal class MarketPrice
{
public static IEqualityComparer<MarketPrice> EqualityComparer { get; } = new CurrentPriceEqualityComparer();
public static IEqualityComparer<MarketPrice> EqualityComparerWithTimeStamp { get; } = new TimeStampPriceEqualityComparer();
public static IComparer<MarketPrice> HighPriceCompare { get; } = new HighestPriceComparer();
public static IComparer<MarketPrice> LowPriceCompare { get; } = new LowestPriceComparer();
public static IComparer<MarketPrice> LatestPriceCompare { get; } = new LatestPriceComparer();

private decimal _price;

public MarketPrice(int itemId, decimal price, Guid marketId)
{
ItemId = itemId;
MarketId = marketId;
Price = price;
}

public decimal Price
{
get => _price;
set
{
_price = value;
TimeStamp = DateTimeOffset.UtcNow;
}
}

public DateTimeOffset TimeStamp { get; private set; }

public Guid MarketId { get; }

public int ItemId { get; }

public override string ToString() => $"{ItemId:D5} - {Price:c} ({MarketId}) [{TimeStamp:HH:mm:ss.fffffff}]";

public static decimal RandomPrice(Random r, decimal basePrice, decimal offset) => basePrice + (decimal)r.NextDouble() * offset;

private class CurrentPriceEqualityComparer : IEqualityComparer<MarketPrice>
{
public virtual bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => x.MarketId.Equals(x.MarketId) && x.ItemId == y.ItemId && x.Price == y.Price;
public int GetHashCode([DisallowNull] MarketPrice obj) => throw new NotImplementedException();
}

private class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer<MarketPrice>
{
public override bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => base.Equals(x, y) && x.TimeStamp == y.TimeStamp;
}

private class LowestPriceComparer : IComparer<MarketPrice>
{
public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
{
Debug.Assert(x.ItemId == y.ItemId);
return x.Price.CompareTo(y.Price);
}
}

private class HighestPriceComparer : IComparer<MarketPrice>
{
public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
{
Debug.Assert(x.ItemId == y.ItemId);
return y.Price.CompareTo(x.Price);
}
}

private class LatestPriceComparer : IComparer<MarketPrice>
{
public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
{
Debug.Assert(x.ItemId == y.ItemId);
return y.TimeStamp.CompareTo(x.TimeStamp);
}
}
}
32 changes: 32 additions & 0 deletions src/DynamicData.Tests/Utilities/ComparerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;

namespace DynamicData.Tests.Utilities;

internal class NoOpComparer<T> : IComparer<T>
{
public int Compare(T x, T y) => throw new NotImplementedException();
}

internal class NoOpEqualityComparer<T> : IEqualityComparer<T>
{
public bool Equals(T x, T y) => throw new NotImplementedException();
public int GetHashCode([DisallowNull] T obj) => throw new NotImplementedException();
}


internal class InvertedComparer<T> : IComparer<T>
{
private readonly IComparer<T> _original;

public InvertedComparer(IComparer<T> original) => _original = original;

public int Compare(T x, T y) => _original.Compare(x, y) * -1;
}


internal static class ComparerExtensions
{
public static IComparer<T> Invert<T>(this IComparer<T> comparer) => new InvertedComparer<T>(comparer);
}
12 changes: 12 additions & 0 deletions src/DynamicData.Tests/Utilities/FunctionalExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;

namespace DynamicData.Tests.Utilities;

internal static class FunctionalExtensions
{
public static T With<T>(this T item, Action<T> action)
{
action(item);
return item;
}
}
21 changes: 21 additions & 0 deletions src/DynamicData.Tests/Utilities/ObservableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Linq;
using System.Reactive.Linq;

namespace DynamicData.Tests.Utilities;

internal static class ObservableExtensions
{
/// <summary>
/// Forces the given observable to fail after the specified number events if an exception is provided.
/// </summary>
/// <typeparam name="T">Observable type.</typeparam>
/// <param name="source">Source Observable.</param>
/// <param name="count">Number of events before failing.</param>
/// <param name="e">Exception to fail with.</param>
/// <returns>The new Observable.</returns>
public static IObservable<T> ForceFail<T>(this IObservable<T> source, int count, Exception? e) =>
e is not null
? source.Take(count).Concat(Observable.Throw<T>(e))
: source;
}
59 changes: 51 additions & 8 deletions src/DynamicData/Cache/Internal/ChangeSetMergeTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@ public void RemoveItems(IEnumerable<KeyValuePair<TKey, TObject>> items, IObserve
EmitChanges(observer);
}

public void RefreshItems(IEnumerable<TKey> keys, IObserver<IChangeSet<TObject, TKey>> observer)
{
var sourceCaches = _selectCaches().ToArray();

// Update the Published Value for each item being removed
if (keys is IList<TKey> list)
{
// zero allocation enumerator
foreach (var key in EnumerableIList.Create(list))
{
ForceEvaluate(sourceCaches, key);
}
}
else
{
foreach (var key in keys)
{
ForceEvaluate(sourceCaches, key);
}
}

EmitChanges(observer);
}

public void ProcessChangeSet(IChangeSet<TObject, TKey> changes, IObserver<IChangeSet<TObject, TKey>> observer)
{
var sourceCaches = _selectCaches().ToArray();
Expand Down Expand Up @@ -125,25 +149,30 @@ private void OnItemUpdated(ChangeSetCache<TObject, TKey>[] sources, TObject item
return;
}

// If the Previous value is missing or is the same as the current value
bool isUpdatingCurrent = !prev.HasValue || CheckEquality(prev.Value, cached.Value);

if (_comparer is null)
{
// If the current value (or there is no way to tell) is being replaced by a different value
if ((!prev.HasValue || CheckEquality(prev.Value, cached.Value)) && !CheckEquality(item, cached.Value))
// If not using the comparer and the current value is being replaced by a different value
if (isUpdatingCurrent && !CheckEquality(item, cached.Value))
{
// Update to the new value
_resultCache.AddOrUpdate(item, key);
}
}
else
{
// The current value is being replaced (or there is no way to tell), so do a full update to select the best one from all the choices
if (!prev.HasValue || CheckEquality(prev.Value, cached.Value))
// If using the comparer and the current value is one being updated
if (isUpdatingCurrent)
{
// The known best value has been replaced, so pick a new one from all the choices
UpdateToBestValue(sources, key, cached);
}
else
{
// If the current value isn't being replaced, check to see if the replacement value is better than the current one
// If the current value isn't being replaced, its only required to check to see if the
// new value is better than the current one
if (ShouldReplace(item, cached.Value))
{
_resultCache.AddOrUpdate(item, key);
Expand Down Expand Up @@ -172,10 +201,24 @@ private void OnItemRefreshed(ChangeSetCache<TObject, TKey>[] sources, TObject it
}
}

private void ForceEvaluate(ChangeSetCache<TObject, TKey>[] sources, TKey key)
{
var cached = _resultCache.Lookup(key);

// Received a refresh change for a key that hasn't been seen yet
// Nothing can be done, so ignore it
if (!cached.HasValue)
{
return;
}

UpdateToBestValue(sources, key, cached);
}

private bool UpdateToBestValue(ChangeSetCache<TObject, TKey>[] sources, TKey key, Optional<TObject> current)
{
// Determine which value should be the one seen downstream
var candidate = SelectValue(sources, key);
var candidate = LookupBestValue(sources, key);
if (candidate.HasValue)
{
// If there isn't a current value
Expand All @@ -201,7 +244,7 @@ private bool UpdateToBestValue(ChangeSetCache<TObject, TKey>[] sources, TKey key
return true;
}

private Optional<TObject> SelectValue(ChangeSetCache<TObject, TKey>[] sources, TKey key)
private Optional<TObject> LookupBestValue(ChangeSetCache<TObject, TKey>[] sources, TKey key)
{
if (sources.Length == 0)
{
Expand All @@ -219,7 +262,7 @@ private Optional<TObject> SelectValue(ChangeSetCache<TObject, TKey>[] sources, T
}

private bool CheckEquality(TObject left, TObject right) =>
ReferenceEquals(left, right) || (_equalityComparer?.Equals(left, right) ?? (_comparer?.Compare(left, right) == 0));
ReferenceEquals(left, right) || (_equalityComparer?.Equals(left, right) ?? false);

// Return true if candidate should replace current as the observed downstream value
private bool ShouldReplace(TObject candidate, TObject current) =>
Expand Down
Loading

0 comments on commit 269828b

Please sign in to comment.