Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#235 flatMapLatest adding overlapping option #236

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs-src/descriptions/multiple-sources.jade
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ div



+descr-method('flat-map-latest', 'flatMapLatest', 'obs.flatMapLatest([fn])').
+descr-method('flat-map-latest', 'flatMapLatest', 'obs.flatMapLatest([fn], [options])').
Like #[b flatMap], but repeats events only from the latest added observable
i.e., switching from one observable to another.

Expand Down Expand Up @@ -433,6 +433,14 @@ pre(title='events in time').
spawned 3: ---3---3---3---3X

result: -------------1---1-----2---2-----3---3---3---3X

p.
By default, #[b flatMapLatest] will remove the previous observable #[i before]
adding the next. This may cause an obversable to deactivate briefly when switching
between observable, even if the observable is in the activation chain of both the
previous and next observable. If you want the next observable to be added #[i before]
removing the previous, you can pass #[tt {overlapping: true}] as #[b options].

div


Expand Down
3 changes: 2 additions & 1 deletion kefir.js.flow
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ declare class Observable<+V,+E=*> {
concat<V2,E2>(otherObs: Observable<V2,E2>): Observable<V|V2,E|E2>;

flatMap<V2,E2>(transform: (value: V) => Observable<V2,E2>): Observable<V2,E|E2>;
flatMapLatest<V2,E2>(transform: (value: V) => Observable<V2,E2>): Observable<V2,E|E2>;
flatMapLatest<V2,E2>(options?: {overlapping?: boolean}): Observable<V2,E|E2>;
flatMapLatest<V2,E2>(transform: (value: V) => Observable<V2,E2>, options?: {overlapping?: boolean}): Observable<V2,E|E2>;
flatMapFirst<V2,E2>(transform: (value: V) => Observable<V2,E2>): Observable<V2,E|E2>;
flatMapConcat<V2,E2>(transform: (value: V) => Observable<V2,E2>): Observable<V2,E|E2>;
flatMapConcurLimit<V2,E2>(transform: (value: V) => Observable<V2,E2>, limit: number): Observable<V2,E|E2>;
Expand Down
6 changes: 4 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,10 @@ import FlatMap from './many-sources/flat-map';
Observable.prototype.flatMap = function(fn) {
return new FlatMap(this, fn).setName(this, 'flatMap');
};
Observable.prototype.flatMapLatest = function(fn) {
return new FlatMap(this, fn, {concurLim: 1, drop: 'old'}).setName(this, 'flatMapLatest');

import flatMapLatest from './many-sources/flat-map-latest';
Observable.prototype.flatMapLatest = function(...args) {
return flatMapLatest(this, ...args);
};
Observable.prototype.flatMapFirst = function(fn) {
return new FlatMap(this, fn, {concurLim: 1}).setName(this, 'flatMapFirst');
Expand Down
14 changes: 11 additions & 3 deletions src/many-sources/abstract-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import {concat, forEach, findByPred, find, remove, cloneArray} from '../utils/co

const id = (x => x);

function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new'} = {}) {
function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new', overlapping = false} = {}) {
Stream.call(this);

this._queueLim = queueLim < 0 ? -1 : queueLim;
this._concurLim = concurLim < 0 ? -1 : concurLim;
this._drop = drop;
this._overlapping = overlapping;
this._queue = [];
this._curSources = [];
this._$handleSubAny = (event) => this._handleSubAny(event);
Expand All @@ -34,8 +35,15 @@ inherit(AbstractPool, Stream, {
if (this._queueLim === -1 || this._queue.length < this._queueLim) {
this._addToQueue(toObs(obj));
} else if (this._drop === 'old') {
this._removeOldest();
this._add(obj, toObs);
if (this._overlapping) {
this._addToCur(toObs(obj));
while (this._curSources.length > this._concurLim) {
this._removeOldest();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this loop seemed a little weird when I was working on it, however, I think to have correct parity with the other side of the condition it needs to be here. Given that this._curSources.length - this._concurLim > 1, the other side, implicitly loops due to recursion. Since we want to add first, using _addToCur, then we need to make sure that we aren't over the _concurLim and blindly removing one might be not enough. Although, I'm not sure how an AbstractPool could get into that state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just remove one. Yes, there is a recursion on the other side, but it should always do only one iteration because:

  1. We always expect this condition to be true: this._curSources.length <= this._concurLim.
  2. When we get to this branch of the code this._curSources.length === this._concurLim should be true.
  3. We remove one from this._curSources, so now this._curSources.length === this._concurLim - 1.
  4. At the following this._add() call, we fall into branch at line 33. And that is the end.

But more importantly, with the condition you added we might actually remove none of the oldest. After this._addToCur(toObs(obj)) we not necessarily have an observable added to _curSources because if toObs(obj) already anded or ends immediately after we subscribe to it, we don't add it to the _curSources (take a look at _addToCur). Btw, we need a test case for this.

Copy link
Member

@rpominov rpominov Feb 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also maybe it make sense to remove recursion on the other side too.

  this._removeOldest();
- this._add(obj, toObs);
+ this._addToCur(toObs(obj));

This way we get the correct parity.

Copy link
Contributor Author

@32bitkid 32bitkid Feb 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure thats quite equivalent; there may not be a public api to aggravate the condition, but I think that when { concurLim: 1, queueLim: 1, drop: 'old' }, and given that both _curSources and _queue are full and I add another one, I would want to:

  1. drop the eldest item out of _curSources
  2. promote the eldest item from _queue
  3. add_ the new_ stream to the _queue

But, I'll need to think about it more. However, I think the same concern is valid for the overlapping side of the branch. If there is a queue, then one should add to _queue and let _removeOldest pull it out of the queue, but that doesn't really resolve the actual use-case for using overlapping to begin with. I think I need to step back and think about it harder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right! I didn't realize that _removeOldest pulls from the queue. Replacing this._add -> this._addToCur isn't equivalent then indeed. I'll think about it too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't test it, but maybe we could do something like this?

_add(obj, toObs /* Function | falsey */, allowOverflow) {
  toObs = toObs || id;
  if (this._concurLim === -1 || this._curSources.length < this._concurLim) {
    this._addToCur(toObs(obj));
  } else {
    if (this._queueLim === -1 || this._queue.length < this._queueLim || allowOverflow) {
      this._addToQueue(toObs(obj));
    } else if (this._drop === 'old') {
      if (this._overlapping) {
        this._add(obj, toObs, true);
        this._removeOldest();
      } else {
        this._removeOldest();
        this._add(obj, toObs);
      }
    }
  }
}

} else {
this._removeOldest();
this._add(obj, toObs);
}
}
}
},
Expand Down
11 changes: 11 additions & 0 deletions src/many-sources/flat-map-latest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import FlatMap from './flat-map';

export default function flatMapLatest(obs, fn, options) {
if (typeof fn !== 'function') {
options = fn;
fn = undefined;
}
options = options === undefined ? {} : options;
const { overlapping = false } = options;
return new FlatMap(obs, fn, {concurLim: 1, drop: 'old', overlapping }).setName(obs, 'flatMapLatest');
}
38 changes: 38 additions & 0 deletions test/specs/flat-map-latest.coffee
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{stream, prop, send, activate, deactivate, Kefir} = require('../test-helpers.coffee')
sinon = require('sinon')


describe 'flatMapLatest', ->
Expand Down Expand Up @@ -102,3 +103,40 @@ describe 'flatMapLatest', ->
a = send(prop(), [0])
b = send(prop(), [a])
expect(b.flatMapLatest()).toEmit [{current: 0}]


describe 'non-overlapping', ->
it 'should remove the previous stream before adding the next', ->
onDeactivate = sinon.spy()
a = Kefir.stream(-> onDeactivate)
b = stream()
map = b.flatMapLatest()
activate(map)
send(b, [a])
send(b, [a])
deactivate(map)
expect(onDeactivate.callCount).toBe(2)


describe 'overlapping', ->
it 'should add the next stream before removing the previous', ->
onDeactivate = sinon.spy()
a = Kefir.stream(-> onDeactivate)
b = stream()
map = b.flatMapLatest({ overlapping: true })
activate(map)
send(b, [a])
send(b, [a])
deactivate(map)
expect(onDeactivate.callCount).toBe(1)

it 'should accept optional map fn', ->
onDeactivate = sinon.spy()
a = Kefir.stream(-> onDeactivate)
b = stream()
map = b.flatMapLatest(((x) -> x.obs), { overlapping: true })
activate(map)
send(b, [{ obs: a }])
send(b, [{ obs: a }])
deactivate(map)
expect(onDeactivate.callCount).toBe(1)