Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Incorrect InnerJoin and RightJoin implementaion #318

Closed
rhy-ama opened this issue Jan 4, 2020 · 4 comments
Closed

[BUG] Incorrect InnerJoin and RightJoin implementaion #318

rhy-ama opened this issue Jan 4, 2020 · 4 comments

Comments

@rhy-ama
Copy link

rhy-ama commented Jan 4, 2020

This is just an example of one-to-many, where the a subrecord could be referenced/shared by many parent records.

Inner join might make sense not to return duplicates, but cardinality of right join should be that of the set on the right.

    public class SubRecord
    {
        public Guid id { get; set; }
        public string Desc { get; set; }
    }

    public class Record
    {
        public int id { get; set; }
        public Guid SubId { get; set; }
        public string Desc { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var recordsCache = new SourceCache<Record, int>(i => i.id);
            var subRecordsCache = new SourceCache<SubRecord, Guid>(i => i.id);

            subRecordsCache.Edit(cache =>
            {
                cache.AddOrUpdate(new SubRecord() { id = Guid.Parse("f6cebdba-9261-4730-87dc-3ec5c9c7a060"), Desc="sub one"});
                cache.AddOrUpdate(new SubRecord() { id = Guid.Parse("3d8e329d-dbab-4346-8ceb-a0112097469c"), Desc="sub two"});
                cache.AddOrUpdate(new SubRecord() { id = Guid.Parse("4c1f245f-a06f-4bb9-bcae-2d1589a99b21"), Desc="sub three"});
            });

            recordsCache.Edit(cache =>
            {
                cache.AddOrUpdate(new Record() { id = 1, SubId = Guid.Parse("f6cebdba-9261-4730-87dc-3ec5c9c7a060"), Desc = "parent 1 of sub one" });
                cache.AddOrUpdate(new Record() { id = 2, SubId = Guid.Parse("f6cebdba-9261-4730-87dc-3ec5c9c7a060"), Desc = "parent 2 of sub one" });
                cache.AddOrUpdate(new Record() { id = 3, SubId = Guid.Parse("4c1f245f-a06f-4bb9-bcae-2d1589a99b21"), Desc = "parent 3 of sub three" });
            });

            var innerJoinResultSubscription = subRecordsCache.Connect()
                                    .InnerJoin(recordsCache.Connect(), records => records.SubId, (SubRecord subrecord, Record record) => (r: record, s: subrecord))
                                    .Bind(out var innerJoinResult)
                                    .Subscribe();

            Console.WriteLine($"innerJoinResultList count {innerJoinResult.Count}:");
            innerJoinResult.ToObservable()
                .Subscribe(item =>
                {
                    Console.WriteLine($"---> {item.Item1.Desc}, {item.Item2.Desc} ");
                });

            //right join of subrecord to parents - expected result cardinality of n parent records
            var rightJointResultSubscription = subRecordsCache.Connect()
                                    .RightJoin(recordsCache.Connect(), records => records.SubId,
                                        (key, subrecord, record) =>
                                        {
                                            if(subrecord.HasValue)
                                            {
                                                return (record: record, subrecord: subrecord.Value);
                                            }
                                            else
                                            {
                                                return (record: record, subrecord: default(SubRecord));
                                            }
                                        })
                                    .Bind(out var rightJoinResult)
                                    .Subscribe();

            Console.WriteLine($"rightJoinResultList count {rightJoinResult.Count}:");
            rightJoinResult.ToObservable()
                .Subscribe(item =>
                {
                    Console.WriteLine($"---> {item.Item1.Desc}, {item.Item2.Desc} ");
                });
        }
    }

The result is invalid:

innerJoinResultList count 2:
---> parent 2 of sub one, sub one
---> parent 3 of sub three, sub three
rightJoinResultList count 2:
---> parent 2 of sub one, sub one
---> parent 3 of sub three, sub three

Expected result is rightJoinResult count of 3 or

innerJoinResultList count 3:
---> parent 1 of sub one, sub one
---> parent 2 of sub one, sub one
---> parent 3 of sub three, sub three
rightJoinResultList count 3:
---> parent 1 of sub one, sub one
---> parent 2 of sub one, sub one
---> parent 3 of sub three, sub three

Here is the corresponding SQL:

use tempdb;

CREATE TABLE #SubRecord
(
    id INT PRIMARY KEY,
    Name VARCHAR(50) NOT NULL,
);

CREATE TABLE #Record
(
    id INT PRIMARY KEY,
	SubId INT NOT NULL,
    Name VARCHAR(50) NOT NULL,
);

insert into #SubRecord values
	(1, 'sub one'),
	(2, 'sub two'),
	(3, 'sub three');

insert into #Record values
	(1, 1, 'parent 1 ref to sub 1'),
	(2, 1, 'parent 2 ref to sub 1'),
	(3, 3, 'parent 3 ref to sub 3');

select parent.Name Name, sub.Name SubName from #SubRecord sub inner join #Record parent on sub.id = parent.SubId; 
select count(*) from
	(select sub.id from #SubRecord sub inner join #Record parent on sub.id = parent.SubId) as innerjoinresult; 

select parent.Name Name, sub.Name SubName from #SubRecord sub right join #Record parent on sub.id = parent.SubId;
select count(*) from
	(select sub.id from #SubRecord sub right join #Record parent on sub.id = parent.SubId) as rightjoinresult;

and the result:

Inner Join: count 3
parent 1 ref to sub 1	sub one
parent 2 ref to sub 1	sub one
parent 3 ref to sub 3	sub three

Right Join: count 3
parent 1 ref to sub 1	sub one
parent 2 ref to sub 1	sub one
parent 3 ref to sub 3	sub three

PS: It looks like
var rightCache = right.Synchronize(locker).ChangeKey(_rightKeySelector).AsObservableCache(false);
assumes that subrecords could not be shared by parent records ?? as ChangeKey eliminates all but one parents that share subrecords (unique key constraint???).

replacing RightJoin with this Transform produces expected result but I am unsure about concurrency issues

            var rightJoinResultSubscription = recordsCache.Connect()
                                                .Transform(record =>
                                                {
                                                    var subrecord = subRecordsCache.Lookup(record.SubId);
                                                    if (subrecord.HasValue)
                                                    {
                                                        return (record: record, subrecord: subrecord.Value);
                                                    }
                                                    else
                                                    {
                                                        return (record: record, subrecord: default(SubRecord));
                                                    }
                                                })
                                                .Bind(out var rightJoinResult)
                                                .Subscribe();

PS2: This is a follow-up to a closed question/issue about many-many relationships implementation #239
The correct solution there is to use a RightJoin instead of recommended InnerJoin

@rhy-ama rhy-ama changed the title [Bug] Incorrect RightJoin result on SourceCache [BUG] Incorrect RightJoin implementaion Jan 4, 2020
@rhy-ama rhy-ama changed the title [BUG] Incorrect RightJoin implementaion [BUG] Incorrect InnerJoin and RightJoin implementaion Jan 4, 2020
@hutterm
Copy link
Contributor

hutterm commented Jun 3, 2021

I face the same issue.

What documentation are we supposed to trust here? In the wiki https://github.com/reactivemarbles/DynamicData/wiki/Sql-style-joins it states that "These operators produces one-to-one data mappings"

while in

/// Joins the left and right observable data sources, taking all right values and combining any matching left values.

it states that it takes ALL right values

@hutterm
Copy link
Contributor

hutterm commented Jun 3, 2021

@RolandPheasant so I was working on this, and the following seems to work correctly for me. Feel free to use this in the library (and please take a look to see if you spot any immediate red flags in the code).
The most important change probably is, that RighJoin() should return something with TRightKey instead of TLeftKey

    public class CorrectRightJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination>
        where TLeftKey : notnull
        where TRightKey : notnull
    {
        private readonly IObservable<IChangeSet<TLeft, TLeftKey>> _left;

        private readonly Func<TRightKey, Optional<TLeft>, TRight, TDestination> _resultSelector;

        private readonly IObservable<IChangeSet<TRight, TRightKey>> _right;

        private readonly Func<TRight, TLeftKey> _rightKeySelector;

        public CorrectRightJoin(IObservable<IChangeSet<TLeft, TLeftKey>> left, IObservable<IChangeSet<TRight, TRightKey>> right,
            Func<TRight, TLeftKey> rightKeySelector, Func<TRightKey, Optional<TLeft>, TRight, TDestination> resultSelector)
        {
            _left = left ?? throw new ArgumentNullException(nameof(left));
            _right = right ?? throw new ArgumentNullException(nameof(right));
            _rightKeySelector = rightKeySelector ?? throw new ArgumentNullException(nameof(rightKeySelector));
            _resultSelector = resultSelector ?? throw new ArgumentNullException(nameof(resultSelector));
        }

        public IObservable<IChangeSet<TDestination, TRightKey>> Run()
        {
            return Observable.Create<IChangeSet<TDestination, TRightKey>>(
                observer =>
                {
                    var locker = new object();

                    // create local backing stores
                    var leftCache = _left.Synchronize(locker).AsObservableCache(false);
                    var rightCache = _right.Synchronize(locker).AsObservableCache(false);
                    var rightGrouped = _right.Synchronize(locker).GroupWithImmutableState(_rightKeySelector).AsObservableCache(false);

                    // joined is the final cache
                    var joinedCache = new LockFreeObservableCache<TDestination, TRightKey>();

                    var rightLoader = rightCache.Connect().Subscribe(
                        changes =>
                        {
                            joinedCache.Edit(
                                innerCache =>
                                {
                                    foreach (var change in changes.ToConcreteType())
                                    {
                                        var leftKey = _rightKeySelector(change.Current);
                                        switch (change.Reason)
                                        {
                                            case ChangeReason.Add:
                                            case ChangeReason.Update:
                                                // Update with right (and right if it is presents)
                                                var right = change.Current;
                                                var left = leftCache.Lookup(leftKey);
                                                innerCache.AddOrUpdate(_resultSelector(change.Key, left, right), change.Key);
                                                break;

                                            case ChangeReason.Remove:
                                                // remove from result because a right value is expected
                                                innerCache.Remove(change.Key);
                                                break;

                                            case ChangeReason.Refresh:
                                                // propagate upstream
                                                innerCache.Refresh(change.Key);
                                                break;
                                        }
                                    }
                                });
                        });

                    var leftLoader = leftCache.Connect().Subscribe(
                        changes =>
                        {
                            joinedCache.Edit(
                                innerCache =>
                                {
                                    foreach (var change in changes.ToConcreteType())
                                    {
                                        TLeft left = change.Current;
                                        var right = rightGrouped.Lookup(change.Key);

                                        if (right.HasValue)
                                            switch (change.Reason)
                                            {
                                                case ChangeReason.Add:
                                                case ChangeReason.Update:
                                                    foreach (var (key, value) in right.Value.KeyValues)
                                                        innerCache.AddOrUpdate(_resultSelector(key, left, value), key);
                                                    break;

                                                case ChangeReason.Remove:
                                                    foreach (var (key, value) in right.Value.KeyValues)
                                                        innerCache.AddOrUpdate(_resultSelector(key, Optional<TLeft>.None, value), key);
                                                    break;

                                                case ChangeReason.Refresh:
                                                    foreach (var key in right.Value.Keys)
                                                        innerCache.Refresh(key);
                                                    break;
                                            }
                                    }
                                });
                        });

                    return new CompositeDisposable(joinedCache.Connect().NotEmpty().SubscribeSafe(observer), leftCache, rightCache, rightLoader, joinedCache,
                        leftLoader);
                });
        }
    }

@RolandPheasant
Copy link
Collaborator

There's no obvious red flags. The key for me is with these changes do the existing tests pass, and also are there additional test cases which should pass?

@github-actions
Copy link

This issue has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Nov 23, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants