Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into CP_EnhanceCodeFormati…
Browse files Browse the repository at this point in the history
…ngRequirements
  • Loading branch information
ChrisPulman committed Dec 6, 2023
2 parents 97528de + f492247 commit 3a39aac
Show file tree
Hide file tree
Showing 11 changed files with 1,219 additions and 232 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ jobs:
run: dotnet build --no-restore --configuration Release DynamicData.sln
working-directory: src

- name: Run Tests
run: dotnet test --no-restore --configuration Release DynamicData.sln
working-directory: src

- name: Pack
run: dotnet pack --no-restore --configuration Release DynamicData.sln
working-directory: src
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ namespace DynamicData
public static readonly DynamicData.IChangeSet<T> Empty;
public ChangeSet() { }
public ChangeSet(System.Collections.Generic.IEnumerable<DynamicData.Change<T>> items) { }
public ChangeSet(int capacity) { }
public int Adds { get; }
public int Moves { get; }
public int Refreshes { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ namespace DynamicData
public static readonly DynamicData.IChangeSet<T> Empty;
public ChangeSet() { }
public ChangeSet(System.Collections.Generic.IEnumerable<DynamicData.Change<T>> items) { }
public ChangeSet(int capacity) { }
public int Adds { get; }
public int Moves { get; }
public int Refreshes { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ namespace DynamicData
public static readonly DynamicData.IChangeSet<T> Empty;
public ChangeSet() { }
public ChangeSet(System.Collections.Generic.IEnumerable<DynamicData.Change<T>> items) { }
public ChangeSet(int capacity) { }
public int Adds { get; }
public int Moves { get; }
public int Refreshes { get; }
Expand Down
206 changes: 197 additions & 9 deletions src/DynamicData.Tests/Cache/ToObservableChangeSetFixture.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
Expand All @@ -15,6 +17,63 @@ namespace DynamicData.Tests.Cache;
public class ToObservableChangeSetFixture
: ReactiveTest
{
[Fact]
public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded()
{
using var source = new Subject<Item>();

var scheduler = new TestScheduler();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
expireAfter: static item => item.Lifetime,
scheduler: scheduler));

var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) };
source.OnNext(item1);
scheduler.AdvanceBy(1);

// Extend the expiration to a later time
var item2 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(20) };
source.OnNext(item2);
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(2, "2 items were emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "2 items were emitted, 1 of which was a replacement");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(2, "no changes should have occurred, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "no changes should have occurred, since the last check");

// Shorten the expiration to an earlier time (5ms from now is 15m total)
var item3 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(5) };
source.OnNext(item3);
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(3, "1 item was emitted, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item3 }, "1 item was replaced, since the last check");

// One more update with no changes to the expiration
var item4 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(5) };
source.OnNext(item4);
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(4, "1 item was emitted, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "1 item was replaced, since the last check");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(15).Ticks);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(5, "1 expiration should have occurred, since the last check");
results.Data.Items.Should().BeEmpty("the last item should have expired, since the last check");
}

[Fact]
public void ExpirationIsGiven_RemovalIsScheduled()
{
Expand Down Expand Up @@ -112,15 +171,88 @@ public void ExpirationIsGiven_RemovalIsScheduled()
results.Data.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have expired");
}

[Fact]
public void ItemIsEvictedBeforeExpiration_ExpirationIsCancelled()
{
using var source = new Subject<IEnumerable<Item>>();

var scheduler = new TestScheduler();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
expireAfter: static item => item.Lifetime,
limitSizeTo: 3,
scheduler: scheduler));

var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) };
var item2 = new Item() { Id = 2, Lifetime = TimeSpan.FromMilliseconds(10) };
var item3 = new Item() { Id = 3, Lifetime = TimeSpan.FromMilliseconds(10) };
source.OnNext(new[] { item1, item2, item3 });
scheduler.AdvanceBy(1);

var item4 = new Item() { Id = 4 };
source.OnNext(new[] { item4 });
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(2, "2 item sets were emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item4 }, "the size limit of the collection was 3");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(3, "2 items should have expired, at the same time, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "2 items should have expired, since the last check");
}

[Fact]
public void ItemExpiresBeforeEviction_EvictionIsSkipped()
{
using var source = new Subject<IEnumerable<Item>>();

var scheduler = new TestScheduler();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
expireAfter: static item => item.Lifetime,
limitSizeTo: 3,
scheduler: scheduler));

var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) };
var item2 = new Item() { Id = 2 };
var item3 = new Item() { Id = 3 };
source.OnNext(new[] { item1, item2, item3 });
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(1, "1 item set was emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "the size limit of the collection was 3");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(2, "1 expiration should have occurred, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have expired");

var item4 = new Item() { Id = 4 };
source.OnNext(new[] { item4 });
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(3, "1 item set was emitted, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item4 }, "no eviction should have occurred");
}

[Fact]
public void KeySelectorIsNull_ThrowsException()
=> FluentActions.Invoking(() => ObservableCacheEx.ToObservableChangeSet<object, object>(
source: new Subject<object>(),
keySelector: null!))
.Should().Throw<ArgumentNullException>();

[Fact(Skip = "Outstanding bug, error re-throws, instead of emitting on the stream")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")]
[Fact]
public void KeySelectorThrows_SubscriptionReceivesError()
{
using var source = new Subject<Item>();
Expand All @@ -142,8 +274,31 @@ public void KeySelectorThrows_SubscriptionReceivesError()
results.Data.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was emitted before an error occurred");
}

[Fact(Skip = "Outstanding bug, completion is not forwarded")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")]
[Fact]
public void LimitToSizeIs0_ChangeSetsAreEmpty()
{
using var source = new Subject<Item>();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
limitSizeTo: 0));

var item1 = new Item() { Id = 1 };
source.OnNext(item1);

var item2 = new Item() { Id = 2 };
source.OnNext(item2);

var item3 = new Item() { Id = 3 };
source.OnNext(item3);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(3, "3 items were emitted");
results.Data.Items.Should().BeEmpty("the size limit of the collection was 0");
}

[Fact]
public void RemovalsArePending_CompletionWaitsForRemovals()
{
using var source = new Subject<IEnumerable<Item>>();
Expand All @@ -164,19 +319,20 @@ public void RemovalsArePending_CompletionWaitsForRemovals()

source.OnCompleted();

results.Error.Should().BeNull();
results.Completed.Should().BeFalse("item removals have been scheduled, and not completed");
results.Messages.Count.Should().Be(1, "1 item set was emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were emitted");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(30).Ticks);

results.Error.Should().BeNull();
results.Completed.Should().BeTrue("the source has completed, and no outstanding expirations remain");
results.Messages.Count.Should().Be(3, "2 expirations should have occurred, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "3 items were emitted, and 2 should have expired");
}

[Fact(Skip = "Outstanding bug, https://github.com/reactivemarbles/DynamicData/issues/635")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")]
[Fact]
public void SourceCompletesImmediately_SubscriptionCompletes()
{
var item = new Item() { Id = 1 };
Expand All @@ -191,6 +347,7 @@ public void SourceCompletesImmediately_SubscriptionCompletes()
using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(static item => item.Id));

results.Error.Should().BeNull();
results.Completed.Should().BeTrue("the source has completed, and no outstanding expirations remain");
results.Messages.Count.Should().Be(1, "1 item was emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item }, "1 item was emitted");
Expand Down Expand Up @@ -246,12 +403,11 @@ public void SizeLimitIsExceeded_OldestItemsAreRemoved()
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(9, "6 item sets were emitted by the source, 3 of which triggered followup evictions");
results.Messages.Count.Should().Be(6, "6 item sets were emitted by the source");
results.Data.Items.Should().BeEquivalentTo(new[] { item5, item6, item9, item10, item11 }, "the size limit of the collection was 5");
}

[Fact(Skip = "Outstanding bug, notifications are not synchronized, initial item emits after error")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")]
[Fact]
public void SourceErrorsImmediately_SubscriptionReceivesError()
{
var item = new Item() { Id = 1 };
Expand Down Expand Up @@ -322,6 +478,38 @@ public void SourceIsNull_ThrowsException()
keySelector: static item => item))
.Should().Throw<ArgumentNullException>();

[Fact]
public void ThreadPoolSchedulerIsUsed_ExpirationIsThreadSafe()
{
var testDuration = TimeSpan.FromSeconds(1);
var maxItemLifetime = TimeSpan.FromMilliseconds(50);

using var source = new Subject<Item>();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
expireAfter: static item => item.Lifetime,
limitSizeTo: 1000,
scheduler: ThreadPoolScheduler.Instance));

var nextItemId = 1;
var rng = new Random(Seed: 1234567);

var stopwatch = new Stopwatch();
stopwatch.Start();
while (stopwatch.Elapsed < testDuration)
{
source.OnNext(new()
{
Id = nextItemId++,
Lifetime = TimeSpan.FromMilliseconds(rng.Next(maxItemLifetime.Milliseconds + 1))
});
}

results.Error.Should().BeNull();
}

public class Item
{
public int Id { get; init; }
Expand Down

This file was deleted.

Loading

0 comments on commit 3a39aac

Please sign in to comment.