diff --git a/src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs b/src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs index 72a1538be..14cdbcef3 100644 --- a/src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs @@ -565,7 +565,7 @@ public void EnumObservableUsesTheSchedulerAndEmitsAll() using var results = pricesCache.Connect().AsAggregator(); // when - scheduler.AdvanceBy(1); + scheduler.AdvanceBy(MarketCount); // then _marketList.Count.Should().Be(MarketCount); diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs index ccc897f65..a2abcc4f5 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs @@ -53,9 +53,11 @@ public MergeManyChangeSetsCacheFixture() [Theory] [InlineData(5, 7)] [InlineData(10, 50)] - [InlineData(5, 100)] +#if !DEBUG + [InlineData(10, 1_000)] [InlineData(200, 500)] - [InlineData(100, 5)] + [InlineData(1_000, 10)] +#endif public async Task MultiThreadedStressTest(int marketCount, int priceCount) { var MaxAddTime = TimeSpan.FromSeconds(0.250); @@ -65,23 +67,19 @@ public async Task MultiThreadedStressTest(int marketCount, int priceCount) IObservable AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) => Observable.Create(observer => new CompositeDisposable - { + ( AddRemoveMarkets(marketCount, parallel, scheduler) - .Subscribe( - onNext: _ => { }, - onError: ex => observer.OnError(ex)), + .Subscribe( + onNext: static _ => { }, + onError: observer.OnError), _marketCache.Connect() - .MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler)) - .Subscribe( - onNext: _ => { }, - onError: ex => observer.OnError(ex), - onCompleted: () => - { - observer.OnNext(Unit.Default); - observer.OnCompleted(); - }) - }); + .MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler)) + .Subscribe( + onNext: static _ => { }, + onError: observer.OnError, + onCompleted: observer.OnCompleted) + )); IObservable AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) => _marketFaker.IntervalGenerate(MaxAddTime, scheduler) diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs index 9a1416bd8..60f27a4c7 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs @@ -50,10 +50,9 @@ public MergeManyChangeSetsCacheSourceCompareFixture() } [Theory] -#if DEBUG [InlineData(5, 7)] [InlineData(10, 50)] -#else +#if false && !DEBUG [InlineData(10, 1_000)] [InlineData(100, 100)] [InlineData(1_000, 10)] @@ -91,7 +90,7 @@ IObservable AddRemovePrices(Market market, int priceCount, int para .Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)) .Finally(market.PricesCache.Dispose); - var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare); + var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true); var adding = true; using var priceResults = merged.AsAggregator(); diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs index d5fe81e3d..6211e141e 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs @@ -51,9 +51,11 @@ public MergeManyChangeSetsListFixture() [Theory] [InlineData(5, 7)] [InlineData(10, 50)] +#if !DEBUG [InlineData(10, 1_000)] [InlineData(200, 500)] [InlineData(1_000, 10)] +#endif public async Task MultiThreadedStressTest(int ownerCount, int animalCount) { var MaxAddTime = TimeSpan.FromSeconds(0.250); @@ -63,23 +65,19 @@ public async Task MultiThreadedStressTest(int ownerCount, int animalCount) IObservable AddRemoveAnimalsStress(int ownerCount, int animalCount, int parallel, IScheduler scheduler) => Observable.Create(observer => new CompositeDisposable - { - AddRemoveOwners(ownerCount, parallel, scheduler) - .Subscribe( - onNext: static _ => { }, - onError: ex => observer.OnError(ex)), + ( + AddRemoveOwners(ownerCount, parallel, scheduler) + .Subscribe( + onNext: static _ => { }, + onError: observer.OnError), - _animalOwners.Connect() + _animalOwners.Connect() .MergeMany(owner => AddRemoveAnimals(owner, animalCount, parallel, scheduler)) .Subscribe( onNext: static _ => { }, - onError: ex => observer.OnError(ex), - onCompleted: () => - { - observer.OnNext(Unit.Default); - observer.OnCompleted(); - }) - }); + onError: observer.OnError, + onCompleted: observer.OnCompleted) + )); IObservable AddRemoveOwners(int ownerCount, int parallel, IScheduler scheduler) => _animalOwnerFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler) diff --git a/src/DynamicData.Tests/List/MergeChangeSetsFixture.cs b/src/DynamicData.Tests/List/MergeChangeSetsFixture.cs index 9f30052d1..aabb041d0 100644 --- a/src/DynamicData.Tests/List/MergeChangeSetsFixture.cs +++ b/src/DynamicData.Tests/List/MergeChangeSetsFixture.cs @@ -44,9 +44,11 @@ public MergeChangeSetsFixture() [Theory] [InlineData(5, 7)] [InlineData(10, 50)] - [InlineData(10, 100)] - [InlineData(200, 50)] - [InlineData(100, 10)] +#if !DEBUG + [InlineData(10, 1_000)] + [InlineData(200, 500)] + [InlineData(1_000, 10)] +#endif public async Task MultiThreadedStressTest(int ownerCount, int animalCount) { var MaxAddTime = TimeSpan.FromSeconds(0.250); @@ -383,7 +385,7 @@ public void EnumObservableUsesTheScheduler(bool advance) // Act if (advance) { - scheduler.AdvanceBy(1); + scheduler.AdvanceBy(InitialOwnerCount); } // Assert diff --git a/src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs index 1cf96a1d4..91313f5b2 100644 --- a/src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs @@ -52,9 +52,11 @@ public MergeManyChangeSetsCacheFixture() [Theory] [InlineData(5, 7)] [InlineData(10, 50)] - [InlineData(5, 100)] +#if !DEBUG + [InlineData(10, 1_000)] [InlineData(200, 500)] - [InlineData(100, 5)] + [InlineData(1_000, 10)] +#endif public async Task MultiThreadedStressTest(int marketCount, int priceCount) { var MaxAddTime = TimeSpan.FromSeconds(0.250); @@ -64,23 +66,19 @@ public async Task MultiThreadedStressTest(int marketCount, int priceCount) IObservable AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) => Observable.Create(observer => new CompositeDisposable - { + ( AddRemoveMarkets(marketCount, parallel, scheduler) - .Subscribe( - onNext: _ => { }, - onError: ex => observer.OnError(ex)), + .Subscribe( + onNext: static _ => { }, + onError: observer.OnError), _marketList.Connect() - .MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler)) - .Subscribe( - onNext: _ => { }, - onError: ex => observer.OnError(ex), - onCompleted: () => - { - observer.OnNext(Unit.Default); - observer.OnCompleted(); - }) - }); + .MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler)) + .Subscribe( + onNext: static _ => { }, + onError: observer.OnError, + onCompleted: observer.OnCompleted) + )); IObservable AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) => _marketFaker.IntervalGenerate(MaxAddTime, scheduler) @@ -521,7 +519,7 @@ public void ComparerOnlyAddsBetterValuesOnSourceReplace() using var lowPriceResults = _marketList.Connect().DebugSpy("List").MergeManyChangeSets(m => m.LatestPrices, MarketPrice.LowPriceCompare).DebugSpy("MergedLow").AsAggregator(); var marketOriginal = new Market(0); var marketLow = new Market(1); - var marketLowLow = new Market(marketLow); + var marketLowLow = new Market(2); marketOriginal.SetPrices(0, PricesPerMarket, GetRandomPrice); marketLow.SetPrices(0, PricesPerMarket, LowestPrice); marketLowLow.SetPrices(0, PricesPerMarket, LowestPrice - 1); diff --git a/src/DynamicData.Tests/List/MergeManyChangeSetsListFixture.cs b/src/DynamicData.Tests/List/MergeManyChangeSetsListFixture.cs index f204d1208..3fd868bf6 100644 --- a/src/DynamicData.Tests/List/MergeManyChangeSetsListFixture.cs +++ b/src/DynamicData.Tests/List/MergeManyChangeSetsListFixture.cs @@ -48,9 +48,11 @@ public MergeManyChangeSetsListFixture() [Theory] [InlineData(5, 7)] [InlineData(10, 50)] +#if !DEBUG [InlineData(10, 1_000)] [InlineData(200, 500)] [InlineData(1_000, 10)] +#endif public async Task MultiThreadedStressTest(int ownerCount, int animalCount) { var MaxAddTime = TimeSpan.FromSeconds(0.250); @@ -60,23 +62,19 @@ public async Task MultiThreadedStressTest(int ownerCount, int animalCount) IObservable AddRemoveAnimalsStress(int ownerCount, int animalCount, int parallel, IScheduler scheduler) => Observable.Create(observer => new CompositeDisposable - { - AddRemoveOwners(ownerCount, parallel, scheduler) - .Subscribe( - onNext: static _ => { }, - onError: ex => observer.OnError(ex)), + ( + AddRemoveOwners(ownerCount, parallel, scheduler) + .Subscribe( + onNext: static _ => { }, + onError: observer.OnError), - _animalOwners.Connect() + _animalOwners.Connect() .MergeMany(owner => AddRemoveAnimals(owner, animalCount, parallel, scheduler)) .Subscribe( onNext: static _ => { }, - onError: ex => observer.OnError(ex), - onCompleted: () => - { - observer.OnNext(Unit.Default); - observer.OnCompleted(); - }) - }); + onError: observer.OnError, + onCompleted: observer.OnCompleted) + )); IObservable AddRemoveOwners(int ownerCount, int parallel, IScheduler scheduler) => _animalOwnerFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler) diff --git a/src/DynamicData/Cache/Internal/MergeChangeSets.cs b/src/DynamicData/Cache/Internal/MergeChangeSets.cs index ff6220830..79b7fde0c 100644 --- a/src/DynamicData/Cache/Internal/MergeChangeSets.cs +++ b/src/DynamicData/Cache/Internal/MergeChangeSets.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for full license information. using System.Reactive.Concurrency; -using System.Reactive.Disposables; using System.Reactive.Linq; namespace DynamicData.Cache.Internal; @@ -11,83 +10,53 @@ namespace DynamicData.Cache.Internal; /// /// Operator that is similiar to Merge but intelligently handles Cache ChangeSets. /// -internal sealed class MergeChangeSets +internal sealed class MergeChangeSets(IObservable>> source, IEqualityComparer? equalityComparer, IComparer? comparer) where TObject : notnull where TKey : notnull { - private readonly IObservable, int>> _source; - - private readonly IComparer? _comparer; - - private readonly IEqualityComparer? _equalityComparer; - public MergeChangeSets(IEnumerable>> source, IEqualityComparer? equalityComparer, IComparer? comparer, bool completable, IScheduler? scheduler = null) - : this(CreateContainerObservable(source, completable, scheduler), equalityComparer, comparer) - { - } - - public MergeChangeSets(IObservable>> source, IEqualityComparer? equalityComparer, IComparer? comparer) - : this(CreateContainerObservable(source), equalityComparer, comparer) + : this(CreateObservable(source, completable, scheduler), equalityComparer, comparer) { } - private MergeChangeSets(IObservable, int>> source, IEqualityComparer? equalityComparer, IComparer? comparer) - { - _source = source; - _comparer = comparer; - _equalityComparer = equalityComparer; - } - public IObservable> Run() => Observable.Create>( - observer => - { - var locker = new object(); - - // Create a local cache of Merge Containers - var localCache = _source.Synchronize(locker).AsObservableCache(); - - // Set up the change tracker - var changeTracker = new ChangeSetMergeTracker(() => localCache.Items, _comparer, _equalityComparer); - - // Merge all of the changeset streams together and Process them with the change tracker which will emit the results - var subscription = localCache.Connect().MergeMany(mc => mc.Source.Do(static _ => { }, observer.OnError)) - .Synchronize(locker) - .Subscribe( - changes => changeTracker.ProcessChangeSet(changes, observer), - observer.OnError, - observer.OnCompleted); - - return new CompositeDisposable(localCache, subscription); - }); + observer => + { + var locker = new object(); + var cache = new Cache, int>(); + + // This is manages all of the changes + var changeTracker = new ChangeSetMergeTracker(() => cache.Items, comparer, equalityComparer); + + // Create a ChangeSet of Caches, synchronize, update the local copy, and merge the sub-observables together. + return CreateContainerObservable(source, locker) + .Synchronize(locker) + .Do(cache.Clone) + .MergeMany(mc => mc.Source.Do(static _ => { }, observer.OnError)) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); + }); // Can optimize for the Add case because that's the only one that applies - private static Change, int> CreateChange(IObservable> source, int index) => - new(ChangeReason.Add, index, new ChangeSetCache(source)); + private static Change, int> CreateChange(IObservable> source, int index, object locker) => + new(ChangeReason.Add, index, new ChangeSetCache(source.Synchronize(locker))); // Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable - private static IObservable, int>> CreateContainerObservable(IObservable>> source) => - source.Select((src, index) => new ChangeSet, int>(new[] { CreateChange(src, index) })); + private static IObservable, int>> CreateContainerObservable(IObservable>> source, object locker) => + source.Select((src, index) => new ChangeSet, int>(new[] { CreateChange(src, index, locker) })); // Create a ChangeSet Observable with a single event that adds all the values in the enum (and then completes, maybe) - private static IObservable, int>> CreateContainerObservable(IEnumerable>> source, bool completable, IScheduler? scheduler = null) => - Observable.Create, int>>(observer => - { - void EmitChanges() - { - observer.OnNext(new ChangeSet, int>(source.Select(CreateChange))); - - if (completable) - { - observer.OnCompleted(); - } - } + private static IObservable>> CreateObservable(IEnumerable>> source, bool completable, IScheduler? scheduler = null) + { + var obs = (scheduler != null) ? source.ToObservable(scheduler) : source.ToObservable(); - if (scheduler is not null) - { - return scheduler.Schedule(EmitChanges); - } + if (!completable) + { + obs = obs.Concat(Observable.Never>>()); + } - EmitChanges(); - return Disposable.Empty; - }); + return obs; + } } diff --git a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs index 2aba0cab3..fdfed529e 100644 --- a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs +++ b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs @@ -17,35 +17,35 @@ internal sealed class MergeManyCacheChangeSets> Run() => Observable.Create>( - observer => - { - var locker = new object(); - - // Transform to an observable changeset of cached changesets - var sourceCacheOfCaches = source - .Transform((obj, key) => new ChangeSetCache(selector(obj, key).Synchronize(locker))) - .AsObservableCache(); - - // This is manages all of the changes - var changeTracker = new ChangeSetMergeTracker(() => sourceCacheOfCaches.Items, comparer, equalityComparer); - - // Share a connection to the source cache - var shared = sourceCacheOfCaches.Connect().Publish(); - - // Merge the child changeset changes together and apply to the tracker - var allChanges = shared.MergeMany(mc => mc.Source) - .Subscribe( - changes => changeTracker.ProcessChangeSet(changes, observer), - observer.OnError, - observer.OnCompleted); - - // When a source item is removed, all of its sub-items need to be removed - var removedItems = shared - .Synchronize(locker) - .OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer)) - .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer)) - .Subscribe(); - - return new CompositeDisposable(sourceCacheOfCaches, allChanges, removedItems, shared.Connect()); - }); + observer => + { + var locker = new object(); + var cache = new Cache, TKey>(); + + // This is manages all of the changes + var changeTracker = new ChangeSetMergeTracker(() => cache.Items, comparer, equalityComparer); + + // Transform to a cache changeset of child caches, synchronize, update the local copy, and publish. + var shared = source + .Transform((obj, key) => new ChangeSetCache(selector(obj, key).Synchronize(locker))) + .Synchronize(locker) + .Do(cache.Clone) + .Publish(); + + // Merge the child changeset changes together and apply to the tracker + var subMergeMany = shared + .MergeMany(cacheChangeSet => cacheChangeSet.Source) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); + + // When a source item is removed, all of its sub-items need to be removed + var subRemove = shared + .OnItemRemoved(changeSetCache => changeTracker.RemoveItems(changeSetCache.Cache.KeyValues, observer), invokeOnUnsubscribe: false) + .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer)) + .Subscribe(); + + return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove); + }); } diff --git a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs index 5d09bf087..4dc3a4272 100644 --- a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs +++ b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs @@ -24,44 +24,42 @@ internal sealed class MergeManyCacheChangeSetsSourceCompare? _equalityComparer = (equalityComparer != null) ? new ParentChildEqualityCompare(equalityComparer) : null; public IObservable> Run() => Observable.Create>( - observer => - { - var locker = new object(); - - // Transform to an observable cache of merge containers. - var sourceCacheOfCaches = source - .Transform((obj, key) => new ChangeSetCache(_changeSetSelector(obj, key).Synchronize(locker))) - .AsObservableCache(); - - // Share a single connection to the cache - var shared = sourceCacheOfCaches.Connect().Synchronize(locker).Publish(); - - // This is manages all of the changes - var changeTracker = new ChangeSetMergeTracker(() => sourceCacheOfCaches.Items, _comparer, _equalityComparer); - - // Merge the child changeset changes together and apply to the tracker - var allChanges = shared.MergeMany(mc => mc.Source) - .Subscribe( - changes => changeTracker.ProcessChangeSet(changes, observer), - observer.OnError, - observer.OnCompleted); - - // When a source item is removed, all of its sub-items need to be removed - var removedItems = shared - .OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer)) - .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer)) - .Subscribe(); - - // If requested, when the source sees a refresh event, re-evaluate all the keys associated with that source because the priority may have changed - // Because the comparison is based on the parent, which has just been refreshed. - var refreshItems = reevalOnRefresh - ? shared - .OnItemRefreshed(mc => changeTracker.RefreshItems(mc.Cache.Keys, observer)) - .Subscribe() - : Disposable.Empty; - - return new CompositeDisposable(sourceCacheOfCaches, allChanges, removedItems, refreshItems, shared.Connect()); - }).Select(changes => changes.Transform(entry => entry.Child)); + observer => + { + var locker = new object(); + var cache = new Cache, TKey>(); + + // This is manages all of the changes + var changeTracker = new ChangeSetMergeTracker(() => cache.Items, _comparer, _equalityComparer); + + // Transform to an cache changeset of child caches of ParentChildEntry, synchronize, update the local copy, and publish. + var shared = source + .Transform((obj, key) => new ChangeSetCache(_changeSetSelector(obj, key).Synchronize(locker))) + .Synchronize(locker) + .Do(cache.Clone) + .Publish(); + + // Merge the child changeset changes together and apply to the tracker + var subMergeMany = shared + .MergeMany(changeSetCache => changeSetCache.Source) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); + + // When a source item is removed, all of its sub-items need to be removed + var subRemove = shared + .OnItemRemoved(cacheChangeSet => changeTracker.RemoveItems(cacheChangeSet.Cache.KeyValues, observer), invokeOnUnsubscribe: false) + .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer)) + .Subscribe(); + + // Optionally attach a handler for Refresh events + var subRefresh = reevalOnRefresh + ? shared.OnItemRefreshed(cacheChangeSet => changeTracker.RefreshItems(cacheChangeSet.Cache.Keys, observer)).Subscribe() + : Disposable.Empty; + + return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove, subRefresh); + }).Select(changes => changes.Transform(entry => entry.Child)); private sealed class ParentChildEntry(TObject parent, TDestination child) { diff --git a/src/DynamicData/Cache/Internal/MergeManyListChangeSets.cs b/src/DynamicData/Cache/Internal/MergeManyListChangeSets.cs index 24f59d449..e50197fa1 100644 --- a/src/DynamicData/Cache/Internal/MergeManyListChangeSets.cs +++ b/src/DynamicData/Cache/Internal/MergeManyListChangeSets.cs @@ -24,25 +24,26 @@ public IObservable> Run() => Observable.Create(); - // Transform to a changeset of Cloned Child Lists and Share + // Transform to a cache changeset of child lists, synchronize, and publish. var shared = source - .Transform((obj, key) => new ClonedListChangeSet(selector(obj, key).Synchronize(locker), equalityComparer)) - .Publish(); - - // Merge all of the children changesets together and apply to the tracker - var allChanges = shared.MergeMany(clonedList => clonedList.Source.RemoveIndex()) - .Subscribe( - changes => changeTracker.ProcessChangeSet(changes, observer), - observer.OnError, - observer.OnCompleted); + .Transform((obj, key) => new ClonedListChangeSet(selector(obj, key).Synchronize(locker), equalityComparer)) + .Synchronize(locker) + .Publish(); + + // Merge the child changeset changes together and apply to the tracker + var subMergeMany = shared + .MergeMany(clonedList => clonedList.Source.RemoveIndex()) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); // When a source item is removed, all of its sub-items need to be removed - var removedItems = shared - .Synchronize(locker) - .OnItemRemoved(clonedList => changeTracker.RemoveItems(clonedList.List, observer), invokeOnUnsubscribe: false) - .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.List, observer)) - .Subscribe(); + var subRemove = shared + .OnItemRemoved(clonedList => changeTracker.RemoveItems(clonedList.List, observer), invokeOnUnsubscribe: false) + .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.List, observer)) + .Subscribe(); - return new CompositeDisposable(allChanges, removedItems, shared.Connect()); + return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove); }); } diff --git a/src/DynamicData/List/Internal/MergeChangeSets.cs b/src/DynamicData/List/Internal/MergeChangeSets.cs index f92b00e93..eb89e96df 100644 --- a/src/DynamicData/List/Internal/MergeChangeSets.cs +++ b/src/DynamicData/List/Internal/MergeChangeSets.cs @@ -10,60 +10,49 @@ namespace DynamicData.List.Internal; /// /// Operator that is similiar to Merge but intelligently handles List ChangeSets. /// -internal sealed class MergeChangeSets +internal sealed class MergeChangeSets(IObservable>> source, IEqualityComparer? equalityComparer) where TObject : notnull { - private readonly IObservable>> _source; - - private readonly IEqualityComparer? _equalityComparer; - public MergeChangeSets(IEnumerable>> source, IEqualityComparer? equalityComparer, bool completable, IScheduler? scheduler = null) + : this(CreateObservable(source, completable, scheduler), equalityComparer) { - _equalityComparer = equalityComparer; - _source = CreateClonedListObservable(source, completable, scheduler); } - public MergeChangeSets(IObservable>> source, IEqualityComparer? equalityComparer) + public IObservable> Run() => Observable.Create>( + observer => + { + var locker = new object(); + + // This is manages all of the changes + var changeTracker = new ChangeSetMergeTracker(); + + // Merge all of the changeset streams together and Process them with the change tracker which will emit the results + return CreateClonedListObservable(source, locker) + .Synchronize(locker) + .MergeMany(clonedList => clonedList.Source.RemoveIndex().Do(static _ => { }, observer.OnError)) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); + }); + + private static IObservable>> CreateObservable(IEnumerable>> source, bool completable, IScheduler? scheduler) { - _equalityComparer = equalityComparer; - _source = CreateClonedListObservable(source); - } + var obs = (scheduler != null) ? source.ToObservable(scheduler) : source.ToObservable(); - public IObservable> Run() => Observable.Create>( - observer => - { - // This is manages all of the changes - var changeTracker = new ChangeSetMergeTracker(); + if (!completable) + { + obs = obs.Concat(Observable.Never>>()); + } - // Merge all of the changeset streams together and Process them with the change tracker which will emit the results - return _source.MergeMany(clonedList => clonedList.Source.RemoveIndex().Do(static _ => { }, observer.OnError)) - .Subscribe( - changes => changeTracker.ProcessChangeSet(changes, observer), - observer.OnError, - observer.OnCompleted); - }); + return obs; + } // Can optimize for the Add case because that's the only one that applies - private Change> CreateChange(IObservable> source) => - new(ListChangeReason.Add, new ClonedListChangeSet(source, _equalityComparer)); + private Change> CreateChange(IObservable> source, object locker) => + new(ListChangeReason.Add, new ClonedListChangeSet(source.Synchronize(locker), equalityComparer)); // Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable - private IObservable>> CreateClonedListObservable(IObservable>> source) => - source.Select(src => new ChangeSet>(new[] { CreateChange(src) })); - - // Create a ChangeSet Observable with a single event that adds all the values in the enum (and then completes, maybe) - private IObservable>> CreateClonedListObservable(IEnumerable>> source, bool completable, IScheduler? scheduler = null) - { - // Create a changeset that has a change for each changeset in the enumerable - var changeSet = new ChangeSet>(source.Select(CreateChange)); - - // Create an observable that returns it (using the scheduler if provided) - var observable = - scheduler is IScheduler sch - ? Observable.Return>>(changeSet, sch) - : Observable.Return(changeSet); - - // Block completion if it isn't supposed to complete - return completable ? observable : observable.Concat(Observable.Never>>()); - } + private IObservable>> CreateClonedListObservable(IObservable>> source, object locker) => + source.Select(src => new ChangeSet>(new[] { CreateChange(src, locker) })); } diff --git a/src/DynamicData/List/Internal/MergeManyCacheChangeSets.cs b/src/DynamicData/List/Internal/MergeManyCacheChangeSets.cs index fa1f28e0e..0de3c5afd 100644 --- a/src/DynamicData/List/Internal/MergeManyCacheChangeSets.cs +++ b/src/DynamicData/List/Internal/MergeManyCacheChangeSets.cs @@ -21,33 +21,31 @@ public IObservable> Run() => observer => { var locker = new object(); - - // Transform to an observable list of merge containers. - var sourceListOfCaches = source - .Transform(obj => new ChangeSetCache(changeSetSelector(obj).Synchronize(locker))) - .AsObservableList(); + var list = new List>(); // This is manages all of the changes - var changeTracker = new ChangeSetMergeTracker(() => sourceListOfCaches.Items.ToArray(), comparer, equalityComparer); + var changeTracker = new ChangeSetMergeTracker(() => list, comparer, equalityComparer); - // Share a connection to the source list - var shared = sourceListOfCaches.Connect().Publish(); + // Transform to a list changeset of child caches, synchronize, update the local copy, and publish. + var shared = source + .Transform(obj => new ChangeSetCache(changeSetSelector(obj).Synchronize(locker))) + .Synchronize(locker) + .Do(list.Clone) + .Publish(); // Merge the child changeset changes together and apply to the tracker - var allChanges = shared - .Synchronize(locker) - .MergeMany(mc => mc.Source) + var subMergeMany = shared + .MergeMany(chanceSetCache => chanceSetCache.Source) .Subscribe( changes => changeTracker.ProcessChangeSet(changes, observer), observer.OnError, observer.OnCompleted); // When a source item is removed, all of its sub-items need to be removed - var removedItems = shared - .Synchronize(locker) - .OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer)) + var subRemove = shared + .OnItemRemoved(changeSetCache => changeTracker.RemoveItems(changeSetCache.Cache.KeyValues, observer), invokeOnUnsubscribe: false) .Subscribe(); - return new CompositeDisposable(sourceListOfCaches, allChanges, removedItems, shared.Connect()); + return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove); }); } diff --git a/src/DynamicData/List/Internal/MergeManyListChangeSets.cs b/src/DynamicData/List/Internal/MergeManyListChangeSets.cs index 5625358f5..dd2190f20 100644 --- a/src/DynamicData/List/Internal/MergeManyListChangeSets.cs +++ b/src/DynamicData/List/Internal/MergeManyListChangeSets.cs @@ -20,20 +20,17 @@ public IObservable> Run() => { var locker = new object(); - // Transform to a changeset of Cloned Child Lists and then Share - var sourceListOfLists = source - .Transform(obj => new ClonedListChangeSet(selector(obj).Synchronize(locker), equalityComparer)) - .AsObservableList(); - // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(); - // Share a connection to the source cache - var shared = sourceListOfLists.Connect().Publish(); - - // Merge the items back together - var allChanges = shared + // Transform to a list changeset of child lists, synchronize, and publish. + var shared = source + .Transform(obj => new ClonedListChangeSet(selector(obj).Synchronize(locker), equalityComparer)) .Synchronize(locker) + .Publish(); + + // Merge the child changeset changes together and apply to the tracker + var subMergeMany = shared .MergeMany(clonedList => clonedList.Source.RemoveIndex()) .Subscribe( changes => changeTracker.ProcessChangeSet(changes, observer), @@ -41,11 +38,10 @@ public IObservable> Run() => observer.OnCompleted); // When a source item is removed, all of its sub-items need to be removed - var removedItems = shared - .Synchronize(locker) - .OnItemRemoved(mc => changeTracker.RemoveItems(mc.List, observer), invokeOnUnsubscribe: false) + var subRemove = shared + .OnItemRemoved(clonedList => changeTracker.RemoveItems(clonedList.List, observer), invokeOnUnsubscribe: false) .Subscribe(); - return new CompositeDisposable(sourceListOfLists, allChanges, removedItems, shared.Connect()); + return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove); }); }