Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does shareReplay's teardown function get called? #3127

Closed
andrewkshim opened this issue Nov 21, 2017 · 12 comments
Closed

Does shareReplay's teardown function get called? #3127

andrewkshim opened this issue Nov 21, 2017 · 12 comments

Comments

@andrewkshim
Copy link

andrewkshim commented Nov 21, 2017

RxJS version:
5.5.2

Code to reproduce:
This is more of a question so I don't have a reproducible example.

Expected behavior:
Same as above.

Actual behavior:
Same as above.

Additional information:
At first, I encountered what I thought was a bug with the shareReplay operator, but after finding an older issue that brought up the same concerns, I discovered that it was a misunderstanding on my part about how shareReplay was supposed to work.

However, in the process, I dug through some of the source code and now I have a question about the teardown logic in the shareReplay operator:

https://github.com/ReactiveX/rxjs/blob/e159578eda80a96bb68b83418f503428cc23aa7f/src/operators/shareReplay.ts#L43...L49

Since the shareReplay operator does not create its own Subscriber, it looks like that teardown logic isn't getting attached to anything. Will the teardown logic ever get called?

Thanks for all the hard work!

@martinsik
Copy link
Contributor

martinsik commented Nov 21, 2017

The teardown logic is called every time an observer unsubscribes. Just this small part gets called only if the source Observable completed https://github.com/ReactiveX/rxjs/blob/master/src/operators/shareReplay.ts#L47.

@andrewkshim
Copy link
Author

Thanks for the quick response.

Can you elaborate — for shareReplay specifically, how does any Subscriber access the teardown function?

My understanding is that the teardown function needs to get attached to a Subscriber in order for it to be called during an unsubscribe. That attachment happens during Observable.subscribe at L200 to L204:

/* L200 */     if (operator) {
/* L201 */         operator.call(sink, this.source);
/* L202 */     } else {
/* L203 */         sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
/* L204 */     }

When we use shareReplay, L201 executes, so we'll be executing the shareReplayOperation function. That function returns the teardown function, but the teardown function does not subsequently get used.

Many other operators instantiate a Subscriber object, which takes care of attaching the teardown function in its constructor, but shareReplay is not instantiating a Subscriber, so what is the mechanism by which its teardown function gets attached?

@martinsik
Copy link
Contributor

martinsik commented Nov 21, 2017

Hmm, I think you might be right. This line https://github.com/ReactiveX/rxjs/blob/master/src/operators/shareReplay.ts#L43 should be probably this:

this.add(() =>
   refCount--;
   ...
};

instead of this:

return () => {
   refCount--;
   ...
};

@cartant
Copy link
Collaborator

cartant commented Nov 22, 2017

Actually, as the this context is already a Subscriber, the innerSub subscription seems unnecessary -
the subscription to the ReplaySubject will be added to the Subscriber when this is passed to subscribe. Unsubscription from the subject will occur when the Subscriber is unsubscribed. So the implementation could be:

subject.subscribe(this);

this.add(() => {
  refCount--;
  if (subscription && refCount === 0 && isComplete) {
    subscription.unsubscribe();
  }
});

I was tempted to write a PR for this, but with everything of interest in a closure, I can't see how to write an initial, failing test.

It's also awkward as the unsubscription from the source can only occur if the source has completed - at which time the unsubscription is redundant. As far as I can see, the refCount is unnesessary. In fact, the entire teardown looks to be redundant, to me.

@martinsik
Copy link
Contributor

martinsik commented Nov 23, 2017

I think this demo shows not exactly the problem described here but it's not would I expect to happen:

Observable.interval(250)
    .finally(i => console.log('finally 1'))
    .shareReplay(1)
    .take(5)
    .finally(i => console.log('finally 2'))
    .subscribe(console.log);

This prints the following output:

0
1
2
3
4
finally 2

I'd expect that finally 1 would be called too but it never does (even when I used this.add with the dispose callback like I mentioned above). This is because isComplete is false when it's called.

I understand this was the point of of PR #2910 but it requires me to always complete the chain before using shereReplay (.take(5).shareReplay(1) instead of .shareReplay(1).take(5)) which is hard to achieve in practise.

@martinsik
Copy link
Contributor

Is there any resolution on this issue? I've bumped into exactly the same thing yesterday and was wondering why it doesn't work...

@cartant
Copy link
Collaborator

cartant commented Jan 5, 2018

@martinsik My two cents: I think it's all down to refCount not pleasing all of the people all of the time -
and, now, the reference count is essentially ignored for shareReplay. If you've not seen it, Paul Taylor made an interesting comment on refCount here.

However, I, too, would be interesting in knowing whether the problem you've highlighted was considered when PR #2910 was made. The change in behaviour was a surprise to me.

@Airblader
Copy link
Contributor

The refCount is still relevant for subscribing the inner observable, no? So I don't think it's entirely ignored. I actually came to find that what I was looking for in #3238 is exactly the new behavior of shareReplay*: connect on subscribe, cache values even if the refCount goes to 0, but make the observable retryable in case of errors.

In my case this is exactly how caching is supposed to work because I want to cache values even if there are no subscribers for a little while.

*) Going off of the descriptions, I haven't yet tested it.

@Airblader
Copy link
Contributor

FYI, in my case I complete the chain by using takeUntil with an observable that emits when the cache should be torn down.

@cartant
Copy link
Collaborator

cartant commented Jan 21, 2018

@Airblader As far as I can see, refCount is only used to control the unsubscription from the source. However, the unsubscription is only allowed if the source has completed. This seems redundant, as if the source has completed (or errored) the subscription will have been automatically unsubscribed, as that's part of the observable contract.

@Airblader
Copy link
Contributor

As far as I can see, refCount is only used to control the unsubscription from the source.

I hadn't actually checked the source yet and just assumed the refCount was also used to ensure connecting on the first subscription (since refCount() as an operator does exactly that).

This seems redundant, as if the source has completed (or errored) the subscription will have been automatically unsubscribed, as that's part of the observable contract.

Point taken and agreed. :-)

@cartant
Copy link
Collaborator

cartant commented Aug 27, 2018

Closing this in favour of #3336.

@cartant cartant closed this as completed Aug 27, 2018
@lock lock bot locked as resolved and limited conversation to collaborators Sep 26, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants