diff --git a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs index 59cb1c204..ad88ee3a8 100644 --- a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs +++ b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForSource.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; @@ -11,7 +12,6 @@ using Xunit.Abstractions; using DynamicData.Tests.Utilities; -using System.Diagnostics; namespace DynamicData.Tests.Cache; @@ -36,7 +36,7 @@ public void ItemIsRemovedBeforeExpiration_ExpirationIsCancelled() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item3 = new TestItem() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; @@ -51,18 +51,18 @@ public void ItemIsRemovedBeforeExpiration_ExpirationIsCancelled() source.RemoveKey(2); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no items should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no items should have expired"); source.Items.Should().BeEquivalentTo(new[] { item1, item3, item4 }, "3 items were added, and one was removed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1, item3 }.Select(item => new KeyValuePair(item.Id, item)), "items #1 and #3 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues[0].Should().BeEquivalentTo(new[] { item1, item3 }.Select(item => new KeyValuePair(item.Id, item)), "items #1 and #3 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item4 }, "items #1 and #3 should have been removed"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -77,7 +77,7 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; source.AddOrUpdate(item1); @@ -88,14 +88,14 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() source.AddOrUpdate(item2); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item2 }, "item #1 was added, and then replaced"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item2 }, "no changes should have occurred"); // Shorten the expiration to an earlier time @@ -103,8 +103,8 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() source.AddOrUpdate(item3); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item3 }, "item #1 was replaced"); // One more update with no changes to the expiration @@ -112,24 +112,24 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() source.AddOrUpdate(item4); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item4 }, "item #1 was replaced"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item4 }.Select(item => new KeyValuePair(item.Id, item4)), "item #1 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues[0].Should().BeEquivalentTo(new[] { item4 }.Select(item => new KeyValuePair(item.Id, item4)), "item #1 should have expired"); source.Items.Should().BeEmpty("item #1 should have expired"); scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(1).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(1).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEmpty("no changes should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -145,7 +145,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() pollingInterval: TimeSpan.FromMilliseconds(20), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item2 = new TestItem() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; @@ -190,85 +190,85 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() // Verify initial state, after all emissions - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "9 items were added, 2 were replaced, and 1 was refreshed"); // Item scheduled to expire at 10ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "no changes should have occurred"); // Item scheduled to expire at 15ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "no changes should have occurred"); // Expired items should be polled scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1, item2, item6, item7, item8 }.Select(item => new KeyValuePair(item.Id, item)), "items #1, #2, #6, #7, and #8 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues[0].Should().BeEquivalentTo(new[] { item1, item2, item6, item7, item8 }.Select(item => new KeyValuePair(item.Id, item)), "items #1, #2, #6, #7, and #8 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #1, #2, #6, #7, and #8 should have been removed"); // Item scheduled to expire at 30ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(1).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(1).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "no changes should have occurred"); // Expired items should be polled, but should exclude the one that was changed from 40ms to 45ms. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }.Select(item => new KeyValuePair(item.Id, item)), "item #3 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }.Select(item => new KeyValuePair(item.Id, item)), "item #3 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); // Item scheduled to expire at 45ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(2).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(2).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "no changes should have occurred"); // Expired items should be polled scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(60).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }.Select(item => new KeyValuePair(item.Id, item)), "item #10 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }.Select(item => new KeyValuePair(item.Id, item)), "item #10 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have been removed"); // Expired items should be polled, but none should be found scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(80).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(3).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(3).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "no changes should have occurred"); // Expired items should be polled scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(100).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }.Select(item => new KeyValuePair(item.Id, item)), "item #11 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }.Select(item => new KeyValuePair(item.Id, item)), "item #11 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have been removed"); // Next poll should not find anything to expire. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(120).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(4).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(4).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item9 }, "no changes should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -283,7 +283,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item2 = new TestItem() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; @@ -328,66 +328,66 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() // Verify initial state, after all emissions - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "11 items were added, 2 were replaced, and 1 was refreshed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1 }.Select(item => new KeyValuePair(item.Id, item)), "item #1 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues[0].Should().BeEquivalentTo(new[] { item1 }.Select(item => new KeyValuePair(item.Id, item)), "item #1 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item8, item9, item10, item11 }, "item #1 should have been removed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item8 }.Select(item => new KeyValuePair(item.Id, item)), "item #8 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item8 }.Select(item => new KeyValuePair(item.Id, item)), "item #8 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item9, item10, item11 }, "item #8 should have expired"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item2, item6, item7 }.Select(item => new KeyValuePair(item.Id, item)), "items #2, #6, and #7 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item2, item6, item7 }.Select(item => new KeyValuePair(item.Id, item)), "items #2, #6, and #7 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #2, #6, and #7 should have been removed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }.Select(item => new KeyValuePair(item.Id, item)), "item #3 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }.Select(item => new KeyValuePair(item.Id, item)), "item #3 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(4).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(4).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "no changes should have occurred"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(4).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(4).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }.Select(item => new KeyValuePair(item.Id, item)), "item #10 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(4).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(4).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }.Select(item => new KeyValuePair(item.Id, item)), "item #10 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have expired"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(50).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(5).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(5).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }.Select(item => new KeyValuePair(item.Id, item)), "item #11 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(5).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(5).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }.Select(item => new KeyValuePair(item.Id, item)), "item #11 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have expired"); // Remaining item should never expire scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(6).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(6).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item9 }, "no changes should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } // Covers https://github.com/reactivemarbles/DynamicData/issues/716 @@ -406,24 +406,24 @@ public void SchedulerIsInaccurate_RemovalsAreNotSkipped() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; source.AddOrUpdate(item1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); scheduler.SimulateUntilIdle(inaccuracyOffset: TimeSpan.FromMilliseconds(-1)); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1 }.Select(item => new KeyValuePair(item.Id, item)), "item #1 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues[0].Should().BeEquivalentTo(new[] { item1 }.Select(item => new KeyValuePair(item.Id, item)), "item #1 should have expired"); source.Items.Should().BeEmpty("item #1 should have been removed"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -438,21 +438,19 @@ public void SourceCompletes_CompletionIsPropagated() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); source.AddOrUpdate(new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }); scheduler.AdvanceBy(1); source.Complete(); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeTrue(); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeTrue(); // Ensure that the operator does not attept to continue removing items. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); } [Fact] @@ -473,14 +471,12 @@ public void SourceCompletesImmediately_CompletionIsPropagated() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeTrue(); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeTrue(); source.Items.Should().BeEquivalentTo(new[] { item1 }, "no changes should have occurred"); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); } [Fact] @@ -495,7 +491,7 @@ public void SourceErrors_ErrorIsPropagated() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); source.AddOrUpdate(new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }); scheduler.AdvanceBy(1); @@ -503,14 +499,12 @@ public void SourceErrors_ErrorIsPropagated() var error = new Exception("This is a test"); source.SetError(error); - results.TryGetRecordedError().Should().Be(error, "an error was published"); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.Error.Should().Be(error, "an error was published"); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeFalse(); // Ensure that the operator does not attept to continue removing items. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); } [Fact] @@ -532,17 +526,15 @@ public void SourceErrorsImmediately_ErrorIsPropagated() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); - results.TryGetRecordedError().Should().Be(error, "an error was published"); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.Error.Should().Be(error, "an error was published"); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeFalse(); source.Items.Should().BeEquivalentTo(new[] { item1 }, "no changes should have occurred"); // Ensure that the operator does not attept to continue removing items. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); } [Fact] @@ -565,7 +557,7 @@ public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe timeSelector: static item => item.Lifetime, scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); PerformStressEdits( source: source, @@ -576,12 +568,12 @@ public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe await WaitForCompletionAsync(source, results, timeout: TimeSpan.FromMinutes(1)); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(static pair => pair.Value.Lifetime.Should().NotBeNull("only items with an expiration should have expired")); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.Error.Should().BeNull(); + results.RecordedValues.SelectMany(static removals => removals).Should().AllSatisfy(static pair => pair.Value.Lifetime.Should().NotBeNull("only items with an expiration should have expired")); + results.HasCompleted.Should().BeFalse(); source.Items.Should().AllSatisfy(item => item.Lifetime.Should().BeNull("all items with an expiration should have expired")); - _output.WriteLine($"{results.EnumerateRecordedValues().Count()} Expirations occurred, for {results.EnumerateRecordedValues().SelectMany(static item => item).Count()} items"); + _output.WriteLine($"{results.RecordedValues.Count} Expirations occurred, for {results.RecordedValues.SelectMany(static item => item).Count()} items"); } [Fact] @@ -597,7 +589,7 @@ public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() pollingInterval: TimeSpan.FromMilliseconds(10), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); PerformStressEdits( source: source, @@ -608,12 +600,12 @@ public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() await WaitForCompletionAsync(source, results, timeout: TimeSpan.FromMinutes(1)); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(pair => pair.Value.Lifetime.Should().NotBeNull("only items with an expiration should have expired")); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.Error.Should().BeNull(); + results.RecordedValues.SelectMany(static removals => removals).Should().AllSatisfy(pair => pair.Value.Lifetime.Should().NotBeNull("only items with an expiration should have expired")); + results.HasCompleted.Should().BeFalse(); source.Items.Should().AllSatisfy(item => item.Lifetime.Should().BeNull("all items with an expiration should have expired")); - _output.WriteLine($"{results.EnumerateRecordedValues().Count()} Expirations occurred, for {results.EnumerateRecordedValues().SelectMany(static item => item).Count()} items"); + _output.WriteLine($"{results.RecordedValues.Count} Expirations occurred, for {results.RecordedValues.SelectMany(static item => item).Count()} items"); } [Fact] @@ -637,16 +629,14 @@ public void TimeSelectorThrows_ErrorIsPropagated() timeSelector: _ => throw error, scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); source.AddOrUpdate(new TestItem() { Id = 1 }); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().Be(error); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); + results.Error.Should().Be(error); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeFalse(); } private static TestSourceCache CreateTestSource() @@ -745,7 +735,7 @@ private static void PerformStressEdits( private static async Task WaitForCompletionAsync( ISourceCache source, - TestableObserver>> results, + ValueRecordingObserver>> results, TimeSpan timeout) { // Wait up to full minute for the operator to finish processing expirations @@ -757,13 +747,9 @@ private static async Task WaitForCompletionAsync( { await Task.Delay(pollingInterval); - // Identify "completion" as either an error, a completion signal, or all expiring items being removed. - if ((results.TryGetRecordedError() is not null) - || results.TryGetRecordedCompletion() - || source.Items.All(static item => item.Lifetime is null)) - { + // Identify "completion" as either the stream finalizing, or there being no remaining items that need to expire + if (results.HasFinalized || source.Items.All(static item => item.Lifetime is null)) break; - } } } } diff --git a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs index 0b58fc48f..868fad2fd 100644 --- a/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs +++ b/src/DynamicData.Tests/Cache/ExpireAfterFixture.ForStream.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Disposables; @@ -12,8 +13,6 @@ using Xunit; using DynamicData.Tests.Utilities; -using Xunit.Abstractions; -using System.Diagnostics; namespace DynamicData.Tests.Cache; @@ -28,12 +27,13 @@ public void ExpiredItemIsRemoved_RemovalIsSkipped() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item2 = new TestItem() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; @@ -47,14 +47,14 @@ public void ExpiredItemIsRemoved_RemovalIsSkipped() scheduler.AdvanceBy(1); results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(1, "1 source operation was performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have been removed"); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have been removed"); // Send a notification to remove an item that's already been removed source.OnNext(new ChangeSet() @@ -64,9 +64,9 @@ public void ExpiredItemIsRemoved_RemovalIsSkipped() scheduler.AdvanceBy(1); results.Error.Should().BeNull(); - results.Messages.Skip(2).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("no changes should have occurred"); - results.IsCompleted.Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -76,12 +76,13 @@ public void ItemIsRemovedBeforeExpiration_ExpirationIsCancelled() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item3 = new TestItem() { Id = 3, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; @@ -108,16 +109,16 @@ public void ItemIsRemovedBeforeExpiration_ExpirationIsCancelled() scheduler.AdvanceBy(1); results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(3, "3 source operations were performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1, item3, item4 }, "3 items were added, and one was removed"); + results.RecordedChangeSets.Count.Should().Be(3, "3 source operations were performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1, item3, item4 }, "3 items were added, and one was removed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "items #1 and #3 should have been removed"); + results.RecordedChangeSets.Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item4 }, "items #1 and #3 should have been removed"); - results.IsCompleted.Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -127,12 +128,13 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; source.OnNext(new ChangeSet() @@ -150,13 +152,13 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() scheduler.AdvanceBy(1); results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(2, "2 source operations were performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "item #1 was added, and then replaced"); + results.RecordedChangeSets.Count.Should().Be(2, "2 source operations were performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item2 }, "item #1 was added, and then replaced"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(2).Should().BeEmpty("no expirations should have occurred"); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("no expirations should have occurred"); // Shorten the expiration to an earlier time var item3 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15) }; @@ -167,8 +169,8 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() scheduler.AdvanceBy(1); results.Error.Should().BeNull(); - results.Messages.Skip(2).Count().Should().Be(1, "1 source operation was performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item3 }, "item #1 was replaced"); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item3 }, "item #1 was replaced"); // One more update with no changes to the expiration var item4 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15) }; @@ -179,21 +181,21 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() scheduler.AdvanceBy(1); results.Error.Should().BeNull(); - results.Messages.Skip(3).Count().Should().Be(1, "1 source operation was performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "item #1 was replaced"); + results.RecordedChangeSets.Skip(3).Count().Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item4 }, "item #1 was replaced"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(4).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEmpty("item #1 should have expired"); + results.RecordedChangeSets.Skip(4).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEmpty("item #1 should have expired"); scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(5).Should().BeEmpty("no expirations should have occurred"); + results.RecordedChangeSets.Skip(5).Should().BeEmpty("no expirations should have occurred"); - results.IsCompleted.Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -203,13 +205,14 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: CreateTimeSelector(scheduler), pollingInterval: TimeSpan.FromMilliseconds(20), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item2 = new TestItem() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; @@ -218,11 +221,11 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() var item5 = new TestItem() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(100) }; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Add, key: item1.Id, current: item1), - new(reason: ChangeReason.Add, key: item2.Id, current: item2), - new(reason: ChangeReason.Add, key: item3.Id, current: item3), - new(reason: ChangeReason.Add, key: item4.Id, current: item4), - new(reason: ChangeReason.Add, key: item5.Id, current: item5) + new(reason: ChangeReason.Add, key: item1.Id, current: item1, index: 0), + new(reason: ChangeReason.Add, key: item2.Id, current: item2, index: 1), + new(reason: ChangeReason.Add, key: item3.Id, current: item3, index: 2), + new(reason: ChangeReason.Add, key: item4.Id, current: item4, index: 3), + new(reason: ChangeReason.Add, key: item5.Id, current: item5, index: 4) }); scheduler.AdvanceBy(1); @@ -231,8 +234,8 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() var item7 = new TestItem() { Id = 7, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Add, key: item6.Id, current: item6), - new(reason: ChangeReason.Add, key: item7.Id, current: item7) + new(reason: ChangeReason.Add, key: item6.Id, current: item6, index: 5), + new(reason: ChangeReason.Add, key: item7.Id, current: item7, index: 6) }); scheduler.AdvanceBy(1); @@ -240,7 +243,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() var item8 = new TestItem() { Id = 8, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15)}; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Add, key: item8.Id, current: item8) + new(reason: ChangeReason.Add, key: item8.Id, current: item8, index: 7) }); scheduler.AdvanceBy(1); @@ -248,7 +251,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() var item9 = new TestItem() { Id = 9 }; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Add, key: item9.Id, current: item9) + new(reason: ChangeReason.Add, key: item9.Id, current: item9, index: 8) }); scheduler.AdvanceBy(1); @@ -256,7 +259,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() var item10 = new TestItem() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(45) }; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Update, key: item10.Id, current: item10, previous: item4) + new(reason: ChangeReason.Update, key: item10.Id, current: item10, previous: item4, currentIndex: 3, previousIndex: 3) }); scheduler.AdvanceBy(1); @@ -264,7 +267,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() var item11 = new TestItem() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(100) }; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Update, key: item11.Id, current: item11, previous: item5) + new(reason: ChangeReason.Update, key: item11.Id, current: item11, previous: item5, currentIndex: 4, previousIndex: 4) }); scheduler.AdvanceBy(1); @@ -272,7 +275,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() item3.Expiration = DateTimeOffset.FromUnixTimeMilliseconds(55); source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Refresh, key: item3.Id, current: item3) + new(reason: ChangeReason.Refresh, key: item3.Id, current: item3, index: 2) }); scheduler.AdvanceBy(1); @@ -287,74 +290,79 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() // Verify initial state, after all emissions results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(7, "8 source operations were performed, and 1 should have been ignored"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "9 items were added, 2 were replaced, and 1 was refreshed"); + results.RecordedChangeSets.Count.Should().Be(7, "8 source operations were performed, and 1 should have been ignored"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "9 items were added, 2 were replaced, and 1 was refreshed"); + results.RecordedItemsSorted.Should().BeEmpty(); // Item scheduled to expire at 10ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(7).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(7).Should().BeEmpty("no changes should have occurred"); // Item scheduled to expire at 15ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(7).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(7).Should().BeEmpty("no changes should have occurred"); // Expired items should be polled scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(7).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #1, #2, #6, #7, and #8 should have been removed"); + results.RecordedChangeSets.Skip(7).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #1, #2, #6, #7, and #8 should have been removed"); + results.RecordedItemsSorted.Should().BeEmpty(); // Item scheduled to expire at 30ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(8).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(8).Should().BeEmpty("no changes should have occurred"); // Expired items should be polled, but should exclude the one that was changed from 40ms to 45ms. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(8).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); + results.RecordedChangeSets.Skip(8).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); + results.RecordedItemsSorted.Should().BeEmpty(); // Item scheduled to expire at 45ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(9).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(9).Should().BeEmpty("no changes should have occurred"); // Expired items should be polled scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(60).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(9).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have been removed"); + results.RecordedChangeSets.Skip(9).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have been removed"); + results.RecordedItemsSorted.Should().BeEmpty(); // Expired items should be polled, but none should be found scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(80).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(10).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(10).Should().BeEmpty("no changes should have occurred"); // Expired items should be polled scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(100).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(10).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have been removed"); + results.RecordedChangeSets.Skip(10).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have been removed"); + results.RecordedItemsSorted.Should().BeEmpty(); // Next poll should not find anything to expire. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(120).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(11).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(11).Should().BeEmpty("no changes should have occurred"); - results.IsCompleted.Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -364,12 +372,13 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item2 = new TestItem() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; @@ -378,11 +387,11 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() var item5 = new TestItem() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(50) }; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Add, key: item1.Id, current: item1), - new(reason: ChangeReason.Add, key: item2.Id, current: item2), - new(reason: ChangeReason.Add, key: item3.Id, current: item3), - new(reason: ChangeReason.Add, key: item4.Id, current: item4), - new(reason: ChangeReason.Add, key: item5.Id, current: item5) + new(reason: ChangeReason.Add, key: item1.Id, current: item1, index: 0), + new(reason: ChangeReason.Add, key: item2.Id, current: item2, index: 1), + new(reason: ChangeReason.Add, key: item3.Id, current: item3, index: 2), + new(reason: ChangeReason.Add, key: item4.Id, current: item4, index: 3), + new(reason: ChangeReason.Add, key: item5.Id, current: item5, index: 4) }); scheduler.AdvanceBy(1); @@ -391,8 +400,8 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() var item7 = new TestItem() { Id = 7, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20)}; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Add, key: item6.Id, current: item6), - new(reason: ChangeReason.Add, key: item7.Id, current: item7) + new(reason: ChangeReason.Add, key: item6.Id, current: item6, index: 5), + new(reason: ChangeReason.Add, key: item7.Id, current: item7, index: 6) }); scheduler.AdvanceBy(1); @@ -400,7 +409,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() var item8 = new TestItem() { Id = 8, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(15)}; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Add, key: item8.Id, current: item8) + new(reason: ChangeReason.Add, key: item8.Id, current: item8, index: 7) }); scheduler.AdvanceBy(1); @@ -408,7 +417,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() var item9 = new TestItem() { Id = 9 }; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Add, key: item9.Id, current: item9) + new(reason: ChangeReason.Add, key: item9.Id, current: item9, index: 8) }); scheduler.AdvanceBy(1); @@ -416,7 +425,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() var item10 = new TestItem() { Id = 4, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(45) }; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Update, key: item10.Id, current: item10, previous: item4) + new(reason: ChangeReason.Update, key: item10.Id, current: item10, previous: item4, currentIndex: 3, previousIndex: 3) }); scheduler.AdvanceBy(1); @@ -424,7 +433,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() var item11 = new TestItem() { Id = 5, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(50) }; source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Update, key: item11.Id, current: item11, previous: item5) + new(reason: ChangeReason.Update, key: item11.Id, current: item11, previous: item5, currentIndex: 4, previousIndex: 4) }); scheduler.AdvanceBy(1); @@ -432,7 +441,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() item3.Expiration = DateTimeOffset.FromUnixTimeMilliseconds(55); source.OnNext(new ChangeSet() { - new(reason: ChangeReason.Refresh, key: item3.Id, current: item3) + new(reason: ChangeReason.Refresh, key: item3.Id, current: item3, index: 2) }); scheduler.AdvanceBy(1); @@ -447,57 +456,64 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() // Verify initial state, after all emissions results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(7, "8 source operations were performed, and 1 should have been ignored"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "11 items were added, 2 were replaced, and 1 was refreshed"); + results.RecordedChangeSets.Count.Should().Be(7, "8 source operations were performed, and 1 should have been ignored"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1, item2, item3, item6, item7, item8, item9, item10, item11 }, "11 items were added, 2 were replaced, and 1 was refreshed"); + results.RecordedItemsSorted.Should().BeEmpty(); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(7).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item8, item9, item10, item11 }, "item #1 should have been removed"); + results.RecordedChangeSets.Skip(7).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item8, item9, item10, item11 }, "item #1 should have been removed"); + results.RecordedItemsSorted.Should().BeEmpty(); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(8).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item9, item10, item11 }, "item #8 should have expired"); + results.RecordedChangeSets.Skip(8).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item2, item3, item6, item7, item9, item10, item11 }, "item #8 should have expired"); + results.RecordedItemsSorted.Should().BeEmpty(); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(9).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #2, #6, and #7 should have been removed"); + results.RecordedChangeSets.Skip(9).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item3, item9, item10, item11 }, "items #2, #6, and #7 should have been removed"); + results.RecordedItemsSorted.Should().BeEmpty(); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(10).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); + results.RecordedChangeSets.Skip(10).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item9, item10, item11 }, "item #3 should have been removed"); + results.RecordedItemsSorted.Should().BeEmpty(); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(11).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(11).Should().BeEmpty("no changes should have occurred"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(11).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have expired"); + results.RecordedChangeSets.Skip(11).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item9, item11 }, "item #10 should have expired"); + results.RecordedItemsSorted.Should().BeEmpty(); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(50).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(12).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have expired"); + results.RecordedChangeSets.Skip(12).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have expired"); + results.RecordedItemsSorted.Should().BeEmpty(); // Remaining item should never expire scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(13).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(13).Should().BeEmpty("no changes should have occurred"); - results.IsCompleted.Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -507,12 +523,13 @@ public void RemovalsArePending_CompletionWaitsForRemovals() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item2 = new TestItem() { Id = 2 }; @@ -527,27 +544,27 @@ public void RemovalsArePending_CompletionWaitsForRemovals() // Verify initial state, after all emissions results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(1, "1 source operation was performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); results.Error.Should().BeNull(); - results.Messages.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have been removed"); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have been removed"); source.OnCompleted(); results.Error.Should().BeNull(); - results.IsCompleted.Should().BeFalse("removals are pending"); - results.Messages.Skip(2).Should().BeEmpty("no changes should have occurred"); + results.HasCompleted.Should().BeFalse("removals are pending"); + results.RecordedChangeSets.Skip(2).Should().BeEmpty("no changes should have occurred"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); results.Error.Should().BeNull(); - results.IsCompleted.Should().BeTrue(); - results.Messages.Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "item #3 should have expired"); + results.HasCompleted.Should().BeTrue(); + results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item2 }, "item #3 should have expired"); } // Covers https://github.com/reactivemarbles/DynamicData/issues/716 @@ -561,12 +578,13 @@ public void SchedulerIsInaccurate_RemovalsAreNotSkipped() Now = DateTimeOffset.FromUnixTimeMilliseconds(0) }; - using var results = source + using var subscription = source .ExpireAfter( timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; source.OnNext(new ChangeSet() @@ -576,16 +594,16 @@ public void SchedulerIsInaccurate_RemovalsAreNotSkipped() results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(1, "1 source operation was performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); scheduler.SimulateUntilIdle(inaccuracyOffset: TimeSpan.FromMilliseconds(-1)); results.Error.Should().BeNull(); - results.Messages.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); - results.Data.Items.Should().BeEmpty("item #1 should have been removed"); + results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedItemsByKey.Values.Should().BeEmpty("item #1 should have been removed"); - results.IsCompleted.Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -595,12 +613,13 @@ public void SourceCompletes_CompletionIsPropagated() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1 }; var item2 = new TestItem() { Id = 2 }; @@ -613,15 +632,15 @@ public void SourceCompletes_CompletionIsPropagated() }); results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(1, "1 source operation was performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); source.OnCompleted(); results.Error.Should().BeNull(); - results.Messages.Skip(1).Should().BeEmpty("no changes should have occurred"); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no changes should have occurred"); - results.IsCompleted.Should().BeTrue(); + results.HasCompleted.Should().BeTrue(); } [Fact] @@ -647,18 +666,19 @@ public void SourceCompletesImmediately_CompletionIsPropagated() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(1, "1 source operation was performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were added"); - results.IsCompleted.Should().BeTrue(); + results.HasCompleted.Should().BeTrue(); } [Fact] @@ -668,12 +688,13 @@ public void SourceErrors_ErrorIsPropagated() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: item => item.Expiration - scheduler.Now, scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; source.OnNext(new ChangeSet() @@ -683,19 +704,19 @@ public void SourceErrors_ErrorIsPropagated() scheduler.AdvanceBy(1); results.Error.Should().BeNull(); - results.Messages.Count.Should().Be(1, "1 source operations was performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operations was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); var error = new Exception("This is a test"); source.OnError(error); results.Error.Should().Be(error); - results.Messages.Skip(1).Should().BeEmpty("no changes should have occurred"); - results.IsCompleted.Should().BeFalse(); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("no changes should have occurred"); + results.HasCompleted.Should().BeFalse(); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.Messages.Skip(1).Should().BeEmpty("notifications should not get published after an error"); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("notifications should not get published after an error"); } [Fact] @@ -719,21 +740,22 @@ public void SourceErrorsImmediately_ErrorIsPropagated() var scheduler = CreateTestScheduler(); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: item => item.Expiration - scheduler.Now, scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); results.Error.Should().Be(error); - results.Messages.Count.Should().Be(1, "1 source operations was performed"); - results.Data.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); - results.IsCompleted.Should().BeFalse(); + results.RecordedChangeSets.Count.Should().Be(1, "1 source operations was performed"); + results.RecordedItemsByKey.Values.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); + results.HasCompleted.Should().BeFalse(); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.Messages.Skip(1).Should().BeEmpty("notifications should not get published after an error"); + results.RecordedChangeSets.Skip(1).Should().BeEmpty("notifications should not get published after an error"); } [Fact] @@ -750,12 +772,13 @@ public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe var scheduler = ThreadPoolScheduler.Instance; - using var results = source + using var subscription = source .ExpireAfter( timeSelector: static item => item.Lifetime, scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); PublishStressChangeSets( source: source, @@ -767,9 +790,9 @@ public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe await WaitForCompletionAsync(results, timeout: TimeSpan.FromMinutes(1)); results.Error.Should().BeNull(); - results.Data.Items.Should().AllSatisfy(item => item.Lifetime.Should().BeNull("all items with an expiration should have expired")); + results.RecordedItemsByKey.Values.Should().AllSatisfy(item => item.Lifetime.Should().BeNull("all items with an expiration should have expired")); - results.IsCompleted.Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -779,13 +802,14 @@ public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() var scheduler = ThreadPoolScheduler.Instance; - using var results = source + using var subscription = source .ExpireAfter( timeSelector: static item => item.Lifetime, pollingInterval: TimeSpan.FromMilliseconds(10), scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); PublishStressChangeSets( source: source, @@ -799,9 +823,9 @@ public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() var now = scheduler.Now; results.Error.Should().BeNull(); - results.Data.Items.Should().AllSatisfy(item => item.Lifetime.Should().BeNull("all items with an expiration should have expired")); + results.RecordedItemsByKey.Values.Should().AllSatisfy(item => item.Lifetime.Should().BeNull("all items with an expiration should have expired")); - results.IsCompleted.Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -819,12 +843,13 @@ public void TimeSelectorThrows_ErrorIsPropagated() var error = new Exception("This is a test."); - using var results = source + using var subscription = source .ExpireAfter( timeSelector: _ => throw error, scheduler: scheduler) .ValidateSynchronization() - .AsAggregator(); + .ValidateChangeSets(static item => item.Id) + .RecordCacheItems(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; source.OnNext(new ChangeSet() @@ -834,8 +859,8 @@ public void TimeSelectorThrows_ErrorIsPropagated() scheduler.AdvanceBy(1); results.Error.Should().Be(error); - results.Messages.Should().BeEmpty("no source operations should have been processed"); - results.IsCompleted.Should().BeFalse(); + results.RecordedChangeSets.Should().BeEmpty("no source operations should have been processed"); + results.HasCompleted.Should().BeFalse(); } private static void PublishStressChangeSets( @@ -938,7 +963,7 @@ private static void PublishStressChangeSets( } private static async Task WaitForCompletionAsync( - ChangeSetAggregator results, + CacheItemRecordingObserver results, TimeSpan timeout) { // Wait up to full minute for the operator to finish processing expirations @@ -952,8 +977,8 @@ private static async Task WaitForCompletionAsync( // Identify "completion" as either an error, a completion signal, or all expiring items being removed. if ((results.Error is not null) - || results.IsCompleted - || results.Data.Items.All(static item => item.Lifetime is null)) + || results.HasCompleted + || results.RecordedItemsByKey.Values.All(static item => item.Lifetime is null)) { break; } diff --git a/src/DynamicData.Tests/List/ExpireAfterFixture.cs b/src/DynamicData.Tests/List/ExpireAfterFixture.cs index 7b8d946df..9ad68eb16 100644 --- a/src/DynamicData.Tests/List/ExpireAfterFixture.cs +++ b/src/DynamicData.Tests/List/ExpireAfterFixture.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; @@ -12,8 +14,6 @@ using Xunit.Abstractions; using DynamicData.Tests.Utilities; -using System.Collections.Generic; -using System.Diagnostics; namespace DynamicData.Tests.List; @@ -36,7 +36,7 @@ public void ItemIsRemovedBeforeExpiration_ExpirationIsCancelled() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item2 = new TestItem() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; @@ -58,18 +58,18 @@ public void ItemIsRemovedBeforeExpiration_ExpirationIsCancelled() source.RemoveRange(index: 2, count: 2); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no items should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no items should have expired"); source.Items.Should().BeEquivalentTo(new[] { item1, item3, item6, item7 }, "7 items were added, and 3 were removed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1, item3, item6 }, "items #1, #3, and #6 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.ElementAt(0).Should().BeEquivalentTo(new[] { item1, item3, item6 }, "items #1, #3, and #6 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item7 }, "items #1 and #3 should have been removed"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -84,7 +84,7 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; source.Add(item1); @@ -95,14 +95,14 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() source.Replace(item1, item2); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item2 }, "item #1 was added, and then replaced"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item2 }, "no changes should have occurred"); // Shorten the expiration to an earlier time @@ -110,8 +110,8 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() source.Replace(item2, item3); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item3 }, "item #2 was replaced"); // One more update with no changes to the expiration @@ -119,24 +119,24 @@ public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded() source.Replace(item3, item4); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item4 }, "item #3 was replaced"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item4 }, "item #4 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.ElementAt(0).Should().BeEquivalentTo(new[] { item4 }, "item #4 should have expired"); source.Items.Should().BeEmpty("item #4 should have expired"); scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(1).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(1).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEmpty("no changes should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -152,7 +152,7 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() pollingInterval: TimeSpan.FromMilliseconds(20), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item2 = new TestItem() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; @@ -197,85 +197,85 @@ public void PollingIntervalIsGiven_RemovalsAreScheduledAtInterval() // Verify initial state, after all emissions - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1, item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "9 items were added, 2 were replaced, and 1 was refreshed"); // Item scheduled to expire at 10ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1, item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); // Item scheduled to expire at 15ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1, item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); // Expired items should be polled scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1, item2, item6, item7, item8 }, "items #1, #2, #6, #7, and #8 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.ElementAt(0).Should().BeEquivalentTo(new[] { item1, item2, item6, item7, item8 }, "items #1, #2, #6, #7, and #8 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item10, item3, item11, item9 }, options => options.WithStrictOrdering(), "items #1, #2, #6, #7, and #8 should have been removed"); // Item scheduled to expire at 30ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(1).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(1).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item10, item3, item11, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); // Expired items should be polled, but should exclude the one that was changed from 40ms to 45ms. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }, "item #3 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }, "item #3 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item10, item11, item9 }, options => options.WithStrictOrdering(), "item #3 should have been removed"); // Item scheduled to expire at 45ms, but won't be picked up yet scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(2).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(2).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item10, item11, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); // Expired items should be polled scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(60).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }, "item #10 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }, "item #10 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item11, item9 }, "item #10 should have been removed"); // Expired items should be polled, but none should be found scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(80).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(3).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(3).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item11, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); // Expired items should be polled scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(100).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }, "item #11 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }, "item #11 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item9 }, options => options.WithStrictOrdering(), "item #11 should have been removed"); // Next poll should not find anything to expire. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(120).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(4).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(4).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -290,7 +290,7 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; var item2 = new TestItem() { Id = 2, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(20) }; @@ -333,65 +333,65 @@ public void PollingIntervalIsNotGiven_RemovalsAreScheduledImmediately() // Verify initial state, after all emissions - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1, item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "9 items were added, 2 were replaced, and 1 was moved"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1 }, "item #1 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.ElementAt(0).Should().BeEquivalentTo(new[] { item1 }, "item #1 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item2, item10, item3, item11, item6, item7, item8, item9 }, options => options.WithStrictOrdering(), "item #1 should have been removed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(15).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item8 }, "item #8 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(1).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(1).ElementAt(0).Should().BeEquivalentTo(new[] { item8 }, "item #8 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item2, item10, item3, item11, item6, item7, item9 }, options => options.WithStrictOrdering(), "item #8 should have expired"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(20).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item2, item6, item7 }, "items #2, #6, and #7 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(2).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(2).ElementAt(0).Should().BeEquivalentTo(new[] { item2, item6, item7 }, "items #2, #6, and #7 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item10, item3, item11, item9 }, options => options.WithStrictOrdering(), "items #2, #6, and #7 should have been removed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(30).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }, "item #3 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(3).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(3).ElementAt(0).Should().BeEquivalentTo(new[] { item3 }, "item #3 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item10, item11, item9 }, options => options.WithStrictOrdering(), "item #3 should have been removed"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(40).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(4).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(4).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item10, item11, item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(45).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(4).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(4).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }, "item #10 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(4).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(4).ElementAt(0).Should().BeEquivalentTo(new[] { item10 }, "item #10 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item11, item9 }, options => options.WithStrictOrdering(), "item #10 should have expired"); scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(50).Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(5).Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().Skip(5).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }, "item #11 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(5).Count().Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.Skip(5).ElementAt(0).Should().BeEquivalentTo(new[] { item11 }, "item #11 should have expired"); source.Items.Should().BeEquivalentTo(new[] { item9 }, options => options.WithStrictOrdering(), "item #11 should have expired"); scheduler.AdvanceTo(DateTimeOffset.MaxValue.Ticks); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Skip(6).Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Skip(6).Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item9 }, options => options.WithStrictOrdering(), "no changes should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } // Covers https://github.com/reactivemarbles/DynamicData/issues/716 @@ -410,24 +410,24 @@ public void SchedulerIsInaccurate_RemovalsAreNotSkipped() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); var item1 = new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }; source.Add(item1); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); source.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was added"); scheduler.SimulateUntilIdle(inaccuracyOffset: TimeSpan.FromMilliseconds(-1)); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Count().Should().Be(1, "1 expiration should have occurred"); - results.EnumerateRecordedValues().ElementAt(0).Should().BeEquivalentTo(new[] { item1 }, "item #1 should have expired"); + results.Error.Should().BeNull(); + results.RecordedValues.Count.Should().Be(1, "1 expiration should have occurred"); + results.RecordedValues.ElementAt(0).Should().BeEquivalentTo(new[] { item1 }, "item #1 should have expired"); source.Items.Should().BeEmpty("item #1 should have been removed"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.HasCompleted.Should().BeFalse(); } [Fact] @@ -442,21 +442,19 @@ public void SourceCompletes_CompletionIsPropagated() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); source.Add(new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }); scheduler.AdvanceBy(1); source.Complete(); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeTrue(); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeTrue(); // Ensure that the operator does not attept to continue removing items. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); } [Fact] @@ -477,14 +475,12 @@ public void SourceCompletesImmediately_CompletionIsPropagated() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeTrue(); + results.Error.Should().BeNull(); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeTrue(); source.Items.Should().BeEquivalentTo(new[] { item1 }, "no changes should have occurred"); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); } [Fact] @@ -499,7 +495,7 @@ public void SourceErrors_ErrorIsPropagated() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); source.Add(new TestItem() { Id = 1, Expiration = DateTimeOffset.FromUnixTimeMilliseconds(10) }); scheduler.AdvanceBy(1); @@ -507,14 +503,12 @@ public void SourceErrors_ErrorIsPropagated() var error = new Exception("This is a test"); source.SetError(error); - results.TryGetRecordedError().Should().Be(error, "an error was published"); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.Error.Should().Be(error, "an error was published"); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeFalse(); // Ensure that the operator does not attept to continue removing items. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); } [Fact] @@ -536,17 +530,15 @@ public void SourceErrorsImmediately_ErrorIsPropagated() timeSelector: CreateTimeSelector(scheduler), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); - results.TryGetRecordedError().Should().Be(error, "an error was published"); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.Error.Should().Be(error, "an error was published"); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeFalse(); source.Items.Should().BeEquivalentTo(new[] { item1 }, "no changes should have occurred"); // Ensure that the operator does not attept to continue removing items. scheduler.AdvanceTo(DateTimeOffset.FromUnixTimeMilliseconds(10).Ticks); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); } [Fact] @@ -569,7 +561,7 @@ public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe timeSelector: static item => item.Lifetime, scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); PerformStressEdits( source: source, @@ -581,12 +573,12 @@ public async Task ThreadPoolSchedulerIsUsedWithoutPolling_ExpirationIsThreadSafe await WaitForCompletionAsync(source, results, TimeSpan.FromMinutes(1)); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(static item => item.Lifetime.Should().NotBeNull("only items with an expiration should have expired")); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.Error.Should().BeNull(); + results.RecordedValues.SelectMany(static removals => removals).Should().AllSatisfy(static item => item.Lifetime.Should().NotBeNull("only items with an expiration should have expired")); + results.HasCompleted.Should().BeFalse(); source.Items.Should().AllSatisfy(item => item.Lifetime.Should().BeNull("all items with an expiration should have expired")); - _output.WriteLine($"{results.EnumerateRecordedValues().Count()} Expirations occurred, for {results.EnumerateRecordedValues().SelectMany(static item => item).Count()} items"); + _output.WriteLine($"{results.RecordedValues.Count} Expirations occurred, for {results.RecordedValues.SelectMany(static item => item).Count()} items"); } [Fact] @@ -602,7 +594,7 @@ public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() pollingInterval: TimeSpan.FromMilliseconds(10), scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); PerformStressEdits( source: source, @@ -614,12 +606,12 @@ public async Task ThreadPoolSchedulerIsUsedWithPolling_ExpirationIsThreadSafe() await WaitForCompletionAsync(source, results, TimeSpan.FromMinutes(1)); - results.TryGetRecordedError().Should().BeNull(); - results.EnumerateRecordedValues().SelectMany(static removals => removals).Should().AllSatisfy(item => item.Lifetime.Should().NotBeNull("only items with an expiration should have expired")); - results.TryGetRecordedCompletion().Should().BeFalse(); + results.Error.Should().BeNull(); + results.RecordedValues.SelectMany(static removals => removals).Should().AllSatisfy(item => item.Lifetime.Should().NotBeNull("only items with an expiration should have expired")); + results.HasCompleted.Should().BeFalse(); source.Items.Should().AllSatisfy(item => item.Lifetime.Should().BeNull("all items with an expiration should have expired")); - _output.WriteLine($"{results.EnumerateRecordedValues().Count()} Expirations occurred, for {results.EnumerateRecordedValues().SelectMany(static item => item).Count()} items"); + _output.WriteLine($"{results.RecordedValues.Count} Expirations occurred, for {results.RecordedValues.SelectMany(static item => item).Count()} items"); } [Fact] @@ -643,16 +635,14 @@ public void TimeSelectorThrows_ThrowsException() timeSelector: _ => throw error, scheduler: scheduler) .ValidateSynchronization() - .RecordNotifications(out var results, scheduler); + .RecordValues(out var results, scheduler); source.Add(new TestItem() { Id = 1 }); scheduler.AdvanceBy(1); - results.TryGetRecordedError().Should().Be(error); - results.EnumerateRecordedValues().Should().BeEmpty("no expirations should have occurred"); - results.TryGetRecordedCompletion().Should().BeFalse(); - - results.EnumerateInvalidNotifications().Should().BeEmpty(); + results.Error.Should().Be(error); + results.RecordedValues.Should().BeEmpty("no expirations should have occurred"); + results.HasCompleted.Should().BeFalse(); } private static TestScheduler CreateTestScheduler() @@ -804,7 +794,7 @@ private static void PerformStressEdits( private static async Task WaitForCompletionAsync( ISourceList source, - TestableObserver> results, + ValueRecordingObserver> results, TimeSpan timeout) { // Wait up to full minute for the operator to finish processing expirations @@ -816,13 +806,9 @@ private static async Task WaitForCompletionAsync( { await Task.Delay(pollingInterval); - // Identify "completion" as either an error, a completion signal, or all expiring items being removed. - if ((results.TryGetRecordedError() is not null) - || results.TryGetRecordedCompletion() - || source.Items.All(static item => item.Lifetime is null)) - { + // Identify "completion" as either the stream finalizing, or there being no remaining items that need to expire + if (results.HasFinalized || source.Items.All(static item => item.Lifetime is null)) break; - } } } diff --git a/src/DynamicData.Tests/Utilities/CacheItemRecordingObserver.cs b/src/DynamicData.Tests/Utilities/CacheItemRecordingObserver.cs new file mode 100644 index 000000000..7f0eba1b9 --- /dev/null +++ b/src/DynamicData.Tests/Utilities/CacheItemRecordingObserver.cs @@ -0,0 +1,71 @@ +using System.Collections.Generic; +using System.Reactive.Concurrency; + +namespace DynamicData.Tests.Utilities; + +public sealed class CacheItemRecordingObserver + : RecordingObserverBase> + where TObject : notnull + where TKey : notnull +{ + private readonly List> _recordedChangeSets; + private readonly Dictionary _recordedItemsByKey; + private readonly List _recordedItemsSorted; + + public CacheItemRecordingObserver(IScheduler scheduler) + : base(scheduler) + { + _recordedChangeSets = new(); + _recordedItemsByKey = new(); + _recordedItemsSorted = new(); + } + + public IReadOnlyList> RecordedChangeSets + => _recordedChangeSets; + + public IReadOnlyDictionary RecordedItemsByKey + => _recordedItemsByKey; + + public IReadOnlyList RecordedItemsSorted + => _recordedItemsSorted; + + protected override void OnNext(IChangeSet value) + { + if (!HasFinalized) + { + _recordedChangeSets.Add(value); + + foreach (var change in value) + { + switch (change.Reason) + { + case ChangeReason.Add: + _recordedItemsByKey.Add(change.Key, change.Current); + if (change.CurrentIndex is not -1) + _recordedItemsSorted.Insert(change.CurrentIndex, change.Current); + break; + + case ChangeReason.Moved: + _recordedItemsSorted.RemoveAt(change.PreviousIndex); + _recordedItemsSorted.Insert(change.CurrentIndex, change.Current); + break; + + case ChangeReason.Remove: + _recordedItemsByKey.Remove(change.Key); + if (change.CurrentIndex is not -1) + _recordedItemsSorted.RemoveAt(change.CurrentIndex); + break; + + case ChangeReason.Update: + _recordedItemsByKey[change.Key] = change.Current; + if (change.CurrentIndex is not -1) + { + _recordedItemsSorted.RemoveAt(change.PreviousIndex); + _recordedItemsSorted.Insert(change.CurrentIndex, change.Current); + } + break; + } + } + } + } +} diff --git a/src/DynamicData.Tests/Utilities/ListItemRecordingObserver.cs b/src/DynamicData.Tests/Utilities/ListItemRecordingObserver.cs new file mode 100644 index 000000000..8fb48e85c --- /dev/null +++ b/src/DynamicData.Tests/Utilities/ListItemRecordingObserver.cs @@ -0,0 +1,74 @@ +using System.Collections.Generic; +using System.Reactive.Concurrency; + +namespace DynamicData.Tests.Utilities; + +public sealed class ListItemRecordingObserver + : RecordingObserverBase> + where T : notnull +{ + private readonly List> _recordedChangeSets; + private readonly List _recordedItems; + + public ListItemRecordingObserver(IScheduler scheduler) + : base(scheduler) + { + _recordedChangeSets = new(); + _recordedItems = new(); + } + + public IReadOnlyList> RecordedChangeSets + => _recordedChangeSets; + + public IReadOnlyList RecordedItems + => _recordedItems; + + protected override void OnNext(IChangeSet value) + { + if (!HasFinalized) + { + _recordedChangeSets.Add(value); + + foreach (var change in value) + { + switch (change.Reason) + { + case ListChangeReason.Add: + if (change.Item.CurrentIndex is -1) + _recordedItems.Add(change.Item.Current); + else + _recordedItems.Insert(change.Item.CurrentIndex, change.Item.Current); + break; + + case ListChangeReason.AddRange: + if (change.Range.Index is -1) + _recordedItems.AddRange(change.Range); + else + _recordedItems.InsertRange(change.Range.Index, change.Range); + break; + + case ListChangeReason.Clear: + _recordedItems.Clear(); + break; + + case ListChangeReason.Moved: + _recordedItems.RemoveAt(change.Item.PreviousIndex); + _recordedItems.Insert(change.Item.CurrentIndex, change.Item.Current); + break; + + case ListChangeReason.Remove: + _recordedItems.RemoveAt(change.Item.CurrentIndex); + break; + + case ListChangeReason.RemoveRange: + _recordedItems.RemoveRange(change.Range.Index, change.Range.Count); + break; + + case ListChangeReason.Replace: + _recordedItems[change.Item.CurrentIndex] = change.Item.Current; + break; + } + } + } + } +} diff --git a/src/DynamicData.Tests/Utilities/ObservableExtensions.cs b/src/DynamicData.Tests/Utilities/ObservableExtensions.cs index c67aaedcb..76473e153 100644 --- a/src/DynamicData.Tests/Utilities/ObservableExtensions.cs +++ b/src/DynamicData.Tests/Utilities/ObservableExtensions.cs @@ -4,8 +4,11 @@ using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Linq; +using System.Runtime.CompilerServices; using System.Threading; +using FluentAssertions; + namespace DynamicData.Tests.Utilities; internal static class ObservableExtensions @@ -49,18 +52,343 @@ public static IObservable Parallelize(this IObservable source, int c public static IObservable Parallelize(this IObservable source, int count, int parallel) => Observable.Merge(Distribute(count, parallel).Select(n => source.Take(n))); - public static IDisposable RecordNotifications( + public static IDisposable RecordCacheItems( + this IObservable> source, + out CacheItemRecordingObserver observer, + IScheduler? scheduler = null) + where TObject : notnull + where TKey : notnull + { + observer = new CacheItemRecordingObserver(scheduler ?? GlobalConfig.DefaultScheduler); + + return source.Subscribe(observer); + } + + public static IDisposable RecordValues( this IObservable source, - out TestableObserver observer, + out ValueRecordingObserver observer, IScheduler? scheduler = null) { - observer = TestableObserver.Create(scheduler); + observer = new ValueRecordingObserver(scheduler ?? GlobalConfig.DefaultScheduler); return source.Subscribe(observer); } + + public static IObservable> ValidateChangeSets(this IObservable> source) + where T : notnull + // Using Raw observable and observer classes to bypass normal RX safeguards + // This allows the operator to be combined with other operators that might be testing for things that the safeguards normally prevent. + => RawAnonymousObservable.Create>(observer => + { + var sortedItems = new List(); + + var reasons = Enum.GetValues(); + + return source.SubscribeSafe(RawAnonymousObserver.Create>( + onNext: changes => + { + try + { + foreach (var change in changes) + { + change.Range.Should().NotBeNull(); + + change.Reason.Should().BeOneOf(reasons); + + switch (change.Reason.GetChangeType()) + { + case ChangeType.Item: + change.Item.Reason.Should().Be(change.Reason); + + change.Range.Should().BeEmpty("single-item changes should not specify range info"); + break; + + case ChangeType.Range: + change.Item.Reason.Should().Be(default, "range changes should not specify single-item info"); + change.Item.PreviousIndex.Should().Be(-1, "range changes should not specify single-item info"); + change.Item.Previous.HasValue.Should().BeFalse("range changes should not specify single-item info"); + change.Item.CurrentIndex.Should().Be(-1, "range changes should not specify single-item info"); + change.Item.Current.Should().Be(default, "range changes should not specify single-item info"); + break; + } + + switch (change.Reason) + { + case ListChangeReason.Add: + change.Item.PreviousIndex.Should().Be(-1, "only Moved changes should specify a previous index"); + change.Item.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); + + change.Item.CurrentIndex.Should().BeInRange(-1, sortedItems.Count, "the insertion index should be omitted, a valid index of the collection, or the next available index of the collection"); + if (change.Item.CurrentIndex is -1) + sortedItems.Add(change.Item.Current); + else + sortedItems.Insert( + index: change.Item.CurrentIndex, + item: change.Item.Current); + + break; + + case ListChangeReason.AddRange: + change.Range.Index.Should().BeInRange(-1, sortedItems.Count - 1, "the insertion index should be omitted, a valid index of the collection, or the next available index of the collection"); + if (change.Range.Index is -1) + sortedItems.AddRange(change.Range); + else + sortedItems.InsertRange( + index: change.Range.Index, + collection: change.Range); + + break; + + case ListChangeReason.Clear: + change.Range.Index.Should().Be(-1, "a Clear change has no target index"); + change.Range.Should().BeEquivalentTo( + sortedItems, + config => config.WithStrictOrdering(), + "items in the range should match the corresponding items in the collection"); + + sortedItems.Clear(); + + break; + + case ListChangeReason.Moved: + sortedItems.Should().NotBeEmpty("an item cannot be moved within an empty collection"); + + change.Item.PreviousIndex.Should().BeInRange(0, sortedItems.Count - 1, "the source index should be a valid index of the collection"); + change.Item.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); + change.Item.CurrentIndex.Should().BeInRange(0, sortedItems.Count - 1, "the target index should be a valid index of the collection"); + change.Item.Current.Should().Be(sortedItems[change.Item.PreviousIndex], "the item to be moved should match the corresponding item in the collection"); + + sortedItems.RemoveAt(change.Item.PreviousIndex); + sortedItems.Insert( + index: change.Item.CurrentIndex, + item: change.Item.Current); + + break; + + case ListChangeReason.Refresh: + sortedItems.Should().NotBeEmpty("an item cannot be refreshed within an empty collection"); + + change.Item.PreviousIndex.Should().Be(-1, "only Moved changes should specify a previous index"); + change.Item.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); + change.Item.CurrentIndex.Should().BeInRange(0, sortedItems.Count - 1, "the target index should be a valid index of the collection"); + change.Item.Current.Should().Be(sortedItems[change.Item.CurrentIndex], "the item to be refreshed should match the corresponding item in the collection"); + + break; + + case ListChangeReason.Remove: + sortedItems.Should().NotBeEmpty("an item cannot be removed from an empty collection"); + + change.Item.PreviousIndex.Should().Be(-1, "only Moved changes should specify a previous index"); + change.Item.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); + change.Item.CurrentIndex.Should().BeInRange(0, sortedItems.Count - 1, "the index to be removed should be a valid index of the collection"); + change.Item.Current.Should().Be(sortedItems[change.Item.CurrentIndex], "the item to be removed should match the corresponding item in the collection"); + + sortedItems.RemoveAt(change.Item.CurrentIndex); + + break; + + case ListChangeReason.RemoveRange: + change.Range.Index.Should().BeInRange(-1, sortedItems.Count - 1, "the removal index should be omitted, or a valid index of the collection"); + + if (change.Range.Index is -1) + change.Range.Should().BeEmpty("the removal index was omitted"); + else + { + change.Range.Count.Should().BeInRange(1, sortedItems.Count - change.Range.Index, "the range to be removed should contain more items than exist in the collection, at the given removal index"); + change.Range.Should().BeEquivalentTo( + sortedItems + .Skip(change.Range.Index) + .Take(change.Range.Count), + config => config.WithStrictOrdering(), "items to be removed should match the corresponding items in the collection"); + + sortedItems.RemoveRange( + index: change.Range.Index, + count: change.Range.Count); + } + + break; + + case ListChangeReason.Replace: + sortedItems.Should().NotBeEmpty("an item cannot be replaced within an empty collection"); + + change.Item.PreviousIndex.Should().Be(-1, "only Moved changes should specify a previous index"); + change.Item.CurrentIndex.Should().BeInRange(0, sortedItems.Count - 1, "the index to be replaced should be a valid index of the collection"); + change.Item.Previous.HasValue.Should().BeTrue("a Replace change should specify a previous item"); + change.Item.Previous.Should().Be(sortedItems[change.Item.CurrentIndex], "the replaced item should match the corresponding item in the collection"); + + sortedItems[change.Item.CurrentIndex] = change.Item.Current; + + break; + } + } + } + catch (Exception ex) + { + observer.OnError(ex); + } + }, + onError: observer.OnError, + onCompleted: observer.OnCompleted)); + }); + + public static IObservable> ValidateChangeSets( + this IObservable> source, + Func keySelector) + where TObject : notnull + where TKey : notnull + // Using Raw observable and observer classes to bypass normal RX safeguards + // This allows the operator to be combined with other operators that might be testing for things that the safeguards normally prevent. + => RawAnonymousObservable.Create>(observer => + { + var itemsByKey = new Dictionary(); + var sortedKeys = new List(); + var isSorted = null as bool?; + + var reasons = Enum.GetValues(); + + return source.SubscribeSafe(RawAnonymousObserver.Create>( + onNext: changes => + { + try + { + foreach (var change in changes) + { + change.Reason.Should().BeOneOf(reasons); + + change.Key.Should().Be(keySelector.Invoke(change.Current), "the specified key should match the specified item's key"); + + switch (isSorted) + { + // First change determines whether or not all future changesets need to have indexes + case null: + isSorted = change.CurrentIndex is not -1; + break; + + case true: + change.CurrentIndex.Should().BeGreaterThan(-1, "indexes should be specified for a stream that specified them initially"); + break; + + case false: + change.CurrentIndex.Should().Be(-1, "indexes should be omitted for a stream that omitted them initially"); + break; + } + + switch (change.Reason) + { + case ChangeReason.Add: + itemsByKey.Keys.Should().NotContain(change.Key, "the key to be added should not already exist in the collection"); + + change.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); + change.PreviousIndex.Should().Be(-1, "only Moved or Update changes should specify a previous index"); + + if (change.CurrentIndex is not -1) + { + change.CurrentIndex.Should().BeInRange(0, sortedKeys.Count, "the index to be added should be a valid index of the collection, or the next available index of the collection"); + + sortedKeys.Insert( + index: change.CurrentIndex, + item: change.Key); + } + + itemsByKey.Add(change.Key, change.Current); + + break; + + case ChangeReason.Moved: + itemsByKey.Keys.Should().Contain(change.Key, "the key to be moved should exist in the collection"); + + change.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); + change.PreviousIndex.Should().BeInRange(0, sortedKeys.Count - 1, "the source index should be a valid index of the collection"); + + change.Current.Should().Be(itemsByKey[change.Key], "the item to be moved should match the corresponding item in the collection"); + change.CurrentIndex.Should().BeInRange(0, sortedKeys.Count - 1, "the target index should be a valid index of the collection"); + + sortedKeys.RemoveAt(change.PreviousIndex); + sortedKeys.Insert( + index: change.CurrentIndex, + item: change.Key); + + break; + + case ChangeReason.Refresh: + itemsByKey.Keys.Should().Contain(change.Key, "the key to be refreshed should exist in the collection"); + + change.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); + change.PreviousIndex.Should().Be(-1, "only Moved or Update changes should specify a previous index"); + + change.Current.Should().Be(itemsByKey[change.Key], "the item to be refreshed should match the corresponding item in the collection"); + + if (change.CurrentIndex is not -1) + { + change.CurrentIndex.Should().BeInRange(0, sortedKeys.Count - 1, "the index to be refreshed should be a valid index of the collection"); + change.Key.Should().Be(sortedKeys[change.CurrentIndex], "the key to be refreshed should match the corresponding key in the collection"); + } + + break; + + case ChangeReason.Remove: + itemsByKey.Keys.Should().Contain(change.Key, "the key to be removed should exist in the collection"); + + change.Previous.HasValue.Should().BeFalse("only Update changes should specify a previous item"); + change.PreviousIndex.Should().Be(-1, "only Moved or Update changes should specify a previous index"); + + change.Current.Should().Be(itemsByKey[change.Key], "the item to be removed should match the corresponding item in the collection"); + + if (change.CurrentIndex is not -1) + { + change.CurrentIndex.Should().BeInRange(0, sortedKeys.Count - 1, "the index to be removed should be a valid index of the collection"); + change.Key.Should().Be(sortedKeys[change.CurrentIndex], "the key to be removed should match the corresponding key in the collection"); + + sortedKeys.RemoveAt(change.CurrentIndex); + } + + itemsByKey.Remove(change.Key); + + break; + + case ChangeReason.Update: + itemsByKey.Keys.Should().Contain(change.Key, "the key to be updated should exist in the collection"); + + change.Previous.HasValue.Should().BeTrue("an Update change should specify a previous item"); + change.Previous.Value.Should().Be(itemsByKey[change.Key], "the item to be updated should match the corresponding item in the collection"); + + if (change.CurrentIndex is -1) + { + change.PreviousIndex.Should().Be(-1, "a previous index should only be specified if a current index is specified"); + } + else + { + change.PreviousIndex.Should().BeInRange(0, sortedKeys.Count - 1, "the source index should be a valid index of the collection"); + change.Key.Should().Be(sortedKeys[change.PreviousIndex], "the key to be updated should match the corresponding key in the collection"); + + change.CurrentIndex.Should().BeInRange(0, sortedKeys.Count - 1, "the target index should be a valid index of the collection"); + + sortedKeys.RemoveAt(change.PreviousIndex); + sortedKeys.Insert( + index: change.CurrentIndex, + item: change.Key); + } + + itemsByKey[change.Key] = change.Current; + + break; + } + } + + observer.OnNext(changes); + } + catch (Exception ex) + { + observer.OnError(ex); + } + }, + onError: observer.OnError, + onCompleted: observer.OnCompleted)); + }); + public static IObservable ValidateSynchronization(this IObservable source) - // Using Raw observable and observer classes to bypass normal RX safeguards, which prevent out-of-sequence notifications. - // This allows the operator to be combined with TestableObserver, for correctness-testing of operators. + // Using Raw observable and observer classes to bypass normal RX safeguards + // This allows the operator to be combined with other operators that might be testing for things that the safeguards normally prevent. => RawAnonymousObservable.Create(observer => { var inFlightNotification = null as Notification; diff --git a/src/DynamicData.Tests/Utilities/RecordingObserverBase.cs b/src/DynamicData.Tests/Utilities/RecordingObserverBase.cs new file mode 100644 index 000000000..6e29b35f2 --- /dev/null +++ b/src/DynamicData.Tests/Utilities/RecordingObserverBase.cs @@ -0,0 +1,67 @@ +using System; +using System.Collections.Generic; +using System.Reactive.Concurrency; +using System.Reactive; + +using Microsoft.Reactive.Testing; + +namespace DynamicData.Tests.Utilities; + +// Using a custom implementing of IObserver<> to bypass normal RX safeguards, allowing invalid behaviors to be potentially tested for. +public abstract class RecordingObserverBase + : IObserver +{ + private readonly List>> _notifications; + private readonly IScheduler _scheduler; + + private Exception? _error; + private bool _hasCompleted; + + protected RecordingObserverBase(IScheduler scheduler) + { + _notifications = new(); + _scheduler = scheduler; + } + + public Exception? Error + => _error; + + public bool HasCompleted + => _hasCompleted; + + public bool HasFinalized + => _hasCompleted || (_error is not null); + + public IReadOnlyList>> Notifications + => _notifications; + + protected abstract void OnNext(T value); + + void IObserver.OnCompleted() + { + _notifications.Add(new( + time: _scheduler.Now.Ticks, + value: Notification.CreateOnCompleted())); + + _hasCompleted = true; + } + + void IObserver.OnError(Exception error) + { + _notifications.Add(new( + time: _scheduler.Now.Ticks, + value: Notification.CreateOnError(error))); + + if (!HasFinalized) + _error = error; + } + + void IObserver.OnNext(T value) + { + _notifications.Add(new( + time: _scheduler.Now.Ticks, + value: Notification.CreateOnNext(value))); + + OnNext(value); + } +} diff --git a/src/DynamicData.Tests/Utilities/TestableObserver.cs b/src/DynamicData.Tests/Utilities/TestableObserver.cs deleted file mode 100644 index e21058bde..000000000 --- a/src/DynamicData.Tests/Utilities/TestableObserver.cs +++ /dev/null @@ -1,46 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reactive; -using System.Reactive.Concurrency; - -using Microsoft.Reactive.Testing; - -namespace DynamicData.Tests.Utilities; - -public static class TestableObserver -{ - public static TestableObserver Create(IScheduler? scheduler = null) - => new(scheduler ?? DefaultScheduler.Instance); -} - -// Not using any existing Observer class, or Observer.Create() to bypass normal RX safeguards, which prevent out-of-sequence notifications. -public sealed class TestableObserver - : ITestableObserver -{ - private readonly List>> _messages; - private readonly IScheduler _scheduler; - - public TestableObserver(IScheduler scheduler) - { - _messages = new(); - _scheduler = scheduler; - } - - public IList>> Messages - => _messages; - - void IObserver.OnCompleted() - => _messages.Add(new( - time: _scheduler.Now.Ticks, - value: Notification.CreateOnCompleted())); - - void IObserver.OnError(Exception error) - => _messages.Add(new( - time: _scheduler.Now.Ticks, - value: Notification.CreateOnError(error))); - - void IObserver.OnNext(T value) - => _messages.Add(new( - time: _scheduler.Now.Ticks, - value: Notification.CreateOnNext(value))); -} diff --git a/src/DynamicData.Tests/Utilities/TestableObserverExtensions.cs b/src/DynamicData.Tests/Utilities/TestableObserverExtensions.cs deleted file mode 100644 index 757d6a71c..000000000 --- a/src/DynamicData.Tests/Utilities/TestableObserverExtensions.cs +++ /dev/null @@ -1,34 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reactive; - -using FluentAssertions; - -using Microsoft.Reactive.Testing; - -namespace DynamicData.Tests.Utilities; - -internal static class TestableObserverExtensions -{ - public static IEnumerable>> EnumerateInvalidNotifications(this ITestableObserver observer) - => observer.Messages - .SkipWhile(message => message.Value.Kind is NotificationKind.OnNext) - .Skip(1); - - public static IEnumerable EnumerateRecordedValues(this ITestableObserver observer) - => observer.Messages - .TakeWhile(message => message.Value.Kind is NotificationKind.OnNext) - .Select(message => message.Value.Value); - - public static Exception? TryGetRecordedError(this ITestableObserver observer) - => observer.Messages - .Where(message => message.Value.Kind is NotificationKind.OnError) - .Select(message => message.Value.Exception) - .FirstOrDefault(); - - public static bool TryGetRecordedCompletion(this ITestableObserver observer) - => observer.Messages - .Where(message => message.Value.Kind is NotificationKind.OnCompleted) - .Any(); -} diff --git a/src/DynamicData.Tests/Utilities/ValueRecordingObserver.cs b/src/DynamicData.Tests/Utilities/ValueRecordingObserver.cs new file mode 100644 index 000000000..51c53964d --- /dev/null +++ b/src/DynamicData.Tests/Utilities/ValueRecordingObserver.cs @@ -0,0 +1,23 @@ +using System.Collections.Generic; +using System.Reactive.Concurrency; + +namespace DynamicData.Tests.Utilities; + +public sealed class ValueRecordingObserver + : RecordingObserverBase +{ + private readonly List _recordedValues; + + public ValueRecordingObserver(IScheduler scheduler) + : base(scheduler) + => _recordedValues = new(); + + public IReadOnlyList RecordedValues + => _recordedValues; + + protected override void OnNext(T value) + { + if (!HasFinalized) + _recordedValues.Add(value); + } +}