Skip to content

Commit

Permalink
Remove try/finally from SelectMany in TransformAsync, which was causi…
Browse files Browse the repository at this point in the history
…ng race condition for some reason (#664)

Fix race condition after TransformAsync is applied.

Co-authored-by: Tomáš Filip <Devel.EPS@meac.cz>
  • Loading branch information
tomasfil and Tomáš Filip committed Nov 23, 2022
1 parent 1463f80 commit 59b3cac
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
39 changes: 39 additions & 0 deletions src/DynamicData.Tests/Cache/TransformAsyncFixture.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
Expand Down Expand Up @@ -88,6 +89,44 @@ public void Remove()
stub.Results.Data.Count.Should().Be(0, "Should be nothing cached");
}

[Fact]
public async Task RemoveFlowsToTheEnd()
{
int transform = 0;
int count = 500;
ReadOnlyObservableCollection<Person> collection;

var cache = new SourceCache<Person, string>(p => p.Name);
var people = Enumerable.Range(1, count).Select(l => new Person("Name" + l, l)).ToArray();

cache.Connect()
.TransformAsync(async person =>
{
try
{
await Task.Delay(Random.Shared.Next(1, 12));
return person;
}
finally
{
transform++;
}
})
.Bind(out collection)
.Subscribe();

foreach (var p in people)
{
cache.AddOrUpdate(p);
cache.RemoveKey(p.Name);
}

while (transform != count)
await Task.Delay(100);
await Task.Delay(3000);
collection.Count.Should().Be(0);
}

[Fact]
public void ReTransformAll()
{
Expand Down
19 changes: 7 additions & 12 deletions src/DynamicData/Cache/Internal/TransformAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

using DynamicData.Kernel;

Expand Down Expand Up @@ -38,36 +34,35 @@ public IObservable<IChangeSet<TDestination, TKey>> Run()
var cache = new ChangeAwareCache<TransformedItemContainer, TKey>();
var asyncLock = new SemaphoreSlim(1, 1);
var transformer = _source.SelectMany(async changes =>
var transformer = _source.Select(async changes =>
{
try
{
await asyncLock.WaitAsync();
return await DoTransform(cache, changes).ConfigureAwait(false);
}
finally
{
asyncLock.Release();
}
});
}).Concat();
if (_forceTransform is not null)
{
var locker = new object();
var forced = _forceTransform.Synchronize(locker).SelectMany(async shouldTransform =>
var forced = _forceTransform.Synchronize(locker)
.Select(async shouldTransform =>
{
try
{
await asyncLock.WaitAsync();
return await DoTransform(cache, shouldTransform).ConfigureAwait(false);
}
finally
{
asyncLock.Release();
}
});
}).Concat();
transformer = transformer.Synchronize(locker).Merge(forced);
}
Expand Down Expand Up @@ -159,9 +154,9 @@ public TransformedItemContainer(TSource source, TDestination destination)
Destination = destination;
}

public TSource Source { get; }

public TDestination Destination { get; }

public TSource Source { get; }
}

private sealed class TransformResult
Expand Down

0 comments on commit 59b3cac

Please sign in to comment.