Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: attack and release times for refCount #171

Closed
staltz opened this issue Aug 14, 2015 · 29 comments
Closed

Proposal: attack and release times for refCount #171

staltz opened this issue Aug 14, 2015 · 29 comments

Comments

@staltz
Copy link
Member

staltz commented Aug 14, 2015

This is a non-breaking change proposal to extend refCount() with two optional parameters, attackTime and releaseTime.

refCount(attack?: number, release?: number)

The motivation: I have discovered a use case where I need a connectable observable to get connected, but not get disconnected/reconnected if the number of observers went from 1 to 0 then from 0 to 1 synchronously (in the same event loop). In the link above, I just used connect() without disconnecting it, which is bad for memory management. I can imagine other use cases where I'd like to delay the auto-disconnect implicit in the refCount.

For symmetry with release, I also included attack times in case that makes sense in some case.

From the Rx.NET gitter chat Lee Campbell also mentioned the existence of a LazyConnect operator they have which would be refCount(0, Infinity) ("connect when first observer subscribes, but never disconnect").

What do you think? This is definitely an advanced feature that most people don't need to use, but as a silent addition I think it's neat to avoid the imperative connect() whenever naive refCount() isn't enough.

@benlesh
Copy link
Member

benlesh commented Aug 14, 2015

I actually have a use case for this as well, involving WebSockets.

It seems like it should almost be a different operator/method. If I understand your proposal correctly, you want to add an optional temporal nature to refCount, which by name, only implies reference counting is done. Further bikeshedding, I'm not sure what "attack" means. Since it's just a parameter name, you can make it as descriptive as you like. Maybe this is a persistantRefCount, or a refCountWithDelay or the like? I don't mind it being an overload of refCount, it just seems that an overload shouldn't completely change the nature of the operator. But that's one person's opinion.

A totally different angle is maybe there is a use case for a DelayedCompletionSubject that embodies this behavior. Again, I think this would be useful for WebSockets, which I've been wrapping in Subjects.

@staltz
Copy link
Member Author

staltz commented Aug 14, 2015

refCount(200, 0) would mean "when the number of observers goes from 0 to 1, wait for 200ms then connect this ConnectableObservable, if by that time it still has a positive number of observers".

it just seems that an overload shouldn't completely change the nature of the operator.

It wouldn't change the nature of the operator, because refCount(0, 0) is precisely what typical refCount() does, but giving a second thought to it, maybe refCountWithDelay would be better to help people discover this functionality (as opposed to having it hidden as a shady parameter).

@staltz
Copy link
Member Author

staltz commented Aug 14, 2015

Yet another thing to consider is: by having a temporal nature, we are implicitly assuming the setTimeout scheduler. Maybe would make sense to let this scheduler independent by doing something like pausable: another Observable (the "pause") of boolean dictates when to connect/disconnect. Something like refCountWhen(gate$), where the ConnectableObservable connects/disconnects like this (ignore the means of achieving this, look at the semantics of connection only):

Let numObservers$ be the (theoretical) Observable emitting the number of observers of the ConnectableObservable.

The ConnectableObservable connects when

combineLatest(numObservers$, gate$, (num, gate) => ({num, gate}))
  .filter(({num, gate}) => num > 0 && gate); 

emits next, and disconnects when

combineLatest(numObservers$, gate$, (num, gate) => ({num, gate}))
  .filter(({num, gate}) => num === 0 && !gate); 

emits next.

Not sure if this is too generic to be practical. Just tossed ideas.

@benlesh
Copy link
Member

benlesh commented Aug 14, 2015

I like the idea of using another Observable (which can obviously be scheduled) to trigger:

someObservable.publish().refCountCompleteWhen(completions => completions.delay(200))

Where completions is an observable that emits synchronously when refCount hits zero. and the value returned by the notifier function is an Observable that when it nexts or completes tells the ConnectableObservable to disconnect if the refCount is still 0. errors should still propagate down the error path.

Functionality like this would be enough for me to use inside of my AnonymousSubject I use to create the WebSocket subject. So a new type of Subject (as I was suggesting before) probably isn't necessary.

@benlesh
Copy link
Member

benlesh commented Aug 14, 2015

Was talking with some folks around here about it, and this would be possible to do with current operators if RxJS had a doOnUnsubscribe. The idea being that you'd maintain the state in that way, rather than introduce another method or override to the API.

@staltz
Copy link
Member Author

staltz commented Aug 14, 2015

doOnUnsubscribe 👍 we need that. I think I needed it this week and was disappointed to discover it doesn't exist (while it does in RxJava). I used finally as a replacement for doOnUnsubscribe.

@trxcllnt
Copy link
Member

@Blesh @staltz how is doOnUnsubscribe different from finally?

@trxcllnt
Copy link
Member

@staltz I've never liked refCount, as it seems like a specialized case of a broader host of what I've been calling connection strategies. Just like there's many possible flattening strategies (merge, concat, switch), buffering strategies (zip, combine, toArray, join), I suspect there's a host of unexplored connection strategies that control when to go from cold to hot.

I've thought about overloading the ConnectableObservable.prototype.connect with a connectionSelector argument. Maybe something like this?

const numbers = Observable
  .range(0, Number.POSITIVE_INFINITY)
  .map((x) => Math.random())
  .publish();

numbers
  .zip(numbers.skip(1), (x, y) => x * y)
  .subscribe(::console.log);

numbers.connect(function connectionStrategy(source, subject) {
  var connection = null;
  var hasConnected = false;
  return Observable.create(function(observer) {
    var subscription = subject.subscribe(observer);
    if(subject.observers.length === 2 && !hasConnected) {
      connection = source.subscribe(subject);
    }
    return function() {
      subscription.unsubscribe();
      if(subject.observers.length === 0) {
        hasConnected = false;
        connection.unsubscribe();
        connection = null;
      }
    }
  })
});

@benlesh
Copy link
Member

benlesh commented Aug 14, 2015

@Blesh @staltz how is doOnUnsubscribe different from finally?

finally happens after the unsubscription call, where onOnUnsubscribe would happen just before. (I believe, it's an RxJava thing)

@benlesh
Copy link
Member

benlesh commented Aug 14, 2015

@trxcllnt ... I like your proposal. I'd be interested in what RxJava folks like @benjchristensen would think about it. Just to get a different perspective.

@staltz
Copy link
Member Author

staltz commented Aug 14, 2015

I've thought about overloading the ConnectableObservable.prototype.connect with a connectionSelector argument. Maybe something like this?

numbers.connect(function connectionStrategy(source, subject) {
  var connection = null;
  var hasConnected = false;
  return Observable.create(function(observer) {
    var subscription = subject.subscribe(observer);
    if(subject.observers.length === 2 && !hasConnected) {
      connection = source.subscribe(subject);
    }
    return function() {
      subscription.unsubscribe();
      if(subject.observers.length === 0) {
        hasConnected = false;
        connection.unsubscribe();
        connection = null;
      }
    }
  })
});

Good idea.
(Bike-shed: probably missing hasConnected = true; there when the source is subscribed)

@trxcllnt
Copy link
Member

@staltz @Blesh @benjchristensen note, connect with a selector function would behave like refCount does, returning an Observable not a Subscription. This is possibly a large enough reason to name it something other than connect.

@staltz
Copy link
Member Author

staltz commented Aug 14, 2015

Yes, got it.

@benlesh
Copy link
Member

benlesh commented Aug 14, 2015

I feel like there's a reason why this hasn't been done and we're not seeing it... cc/@jhusain and @mattpodwysocki for input.

@staltz
Copy link
Member Author

staltz commented Aug 14, 2015

Yeah, maybe we need to question the underlying assumption ("why do you need a hot behaving like that? do you even need a hot?").

Summoning @headinthebox also.

@benjchristensen
Copy link
Member

Related to this is a new ConnectableObservable.autoConnect feature added to RxJava in 1.0.13: https://github.com/ReactiveX/RxJava/blob/1877fa7bbc176029bcb5af00d8a7715dfbb6d373/src/main/java/rx/observables/ConnectableObservable.java#L96

I'm less convinced of involving time in a 1st class overload of refCount. Once time or other conditions for disconnecting get involved, that feels like the application should just directly use the building blocks to achieve the needed behavior, and not directly change ConnectableObservable. The connect/disconnect semantics already support any custom behavior an app might need.

@benlesh
Copy link
Member

benlesh commented Aug 14, 2015

@benjchristensen ... I'm more interested in what you think of @trxcllnt's proposal. I find it really interesting. The ability to specify a strategy for subscribing and unsubscribing from the source and the observable in it's entirety is pretty nice.

@staltz
Copy link
Member Author

staltz commented Aug 15, 2015

Yeah, I'm looking into how to make variants of refCount, not necessarily put time conditions in it. Paul Taylor's connect would be ideal, because I think we can't emulate refCount in a lift (or, can we?).

@staltz
Copy link
Member Author

staltz commented Aug 27, 2015

I would really like to see Paul's new connect(). It seems like Ben Lesh agrees too, so I'm guessing we have enough quorum to approve it? Or do we need more feedback?

@benlesh
Copy link
Member

benlesh commented Aug 27, 2015

@staltz @trxcllnt I really like it... we probably need to bike-shed the name a little. The other thing that is up for debate with this is whether or not to leave the current connect behavior in there if we can make something like this that returns a "cold until hot" observable. I suspect the answer is "yes", but I'd like more feedback from @mattpodwysocki and @headinthebox here, since it could be a change to a critical piece of Rx.

@trxcllnt
Copy link
Member

@benjchristensen @Blesh @staltz If we're bike-shedding, I think control sounds like an appropriate name for what I proposed.

@staltz
Copy link
Member Author

staltz commented Aug 28, 2015

Sounds good to me

@benlesh
Copy link
Member

benlesh commented Sep 18, 2015

@trxcllnt, can you create an issue for your connect operator? I'm going to close this issue as it's gotten stale.

@benlesh benlesh closed this as completed Sep 18, 2015
@marinho
Copy link

marinho commented Aug 31, 2016

Hi guys, I need that but it's unclear to me how your discussion ended up. What is the resulting operator with Paul's connection stretegies?

@trxcllnt
Copy link
Member

trxcllnt commented Sep 1, 2016

@marinho I didn't get around to implementing a generic controlling operator, but you can definitely achieve any behavior you like with Observable.using and some local selector state.

@NMinhNguyen
Copy link

@trxcllnt could you please elaborate on what you meant by

local selector state.

perhaps by providing a small example?

@trxcllnt
Copy link
Member

trxcllnt commented Dec 19, 2016

@NMinhNguyen

source
  .multicast(() => new Subject())
  .let((source) => {
    let connection, subscriptions = new Subscription();
    return Observable.using(
      () => {
        const localSubscription = new Subscription(() => {
          subscriptions.remove(localSubscription);
          if (connection && subscriptions.length === 3 /* <-- disconnect when the subscriber count drops to 2 (+1 for the connection subscription) */) {
            subscriptions.unsubscribe();
            subscriptions = new Subscription();
            connection = null;
          }
        });
        subscriptions.add(localSubscription);
        if (!connection && subscriptions.length === 5 /*<- connect on the 5th subscription */) {
          subscriptions.add(connection = source.connect());
        }
        return localSubscription;
      },
      () => source
    );
  })

@NMinhNguyen
Copy link

NMinhNguyen commented Feb 3, 2017

@trxcllnt Thanks a lot for that! Really insightful. So I finally got round to trying it out, and it seems like there is no length property on Subscription objects. There is however a private array of subscriptions - subscriptions._subscriptions, but accessing it seems like bad practice? Would you say it's better to manually track the number of subscriptions?

@lock
Copy link

lock bot commented Jun 6, 2018

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 6, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants