Skip to content

Commit

Permalink
feat(shareReplay): add the shareReplay() operator
Browse files Browse the repository at this point in the history
Add shareReplay() as a core operator. Vaguely relates to issue ReactiveX#439 and
issue ReactiveX#298.
  • Loading branch information
Andre Medeiros committed Oct 6, 2015
1 parent af603bd commit 65c84ea
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 2 deletions.
20 changes: 20 additions & 0 deletions perf/micro/immediate-scheduler/operators/share-replay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldShareReplayWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate)
.shareReplay(3);
var newShareReplayWithImmediateScheduler = RxNew.Observable.range(0, 25)
.shareReplay(3);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old shareReplay with immediate scheduler', function () {
oldShareReplayWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new shareReplay with immediate scheduler', function () {
newShareReplayWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
85 changes: 85 additions & 0 deletions spec/operators/shareReplay-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/* globals describe, expect, it, hot, cold, expectObservable */

var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.shareReplay()', function () {
it('should share a single subscription', function () {
var subscriptionCount = 0;
var obs = new Observable(function (observer) {
subscriptionCount++;
});

var source = obs.shareReplay(1);

expect(subscriptionCount).toBe(0);

source.subscribe();
source.subscribe();

expect(subscriptionCount).toBe(1);
});

it('should replay as many events as specified by the bufferSize', function (done) {
var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = new Observable(function (observer) {
subscriptions++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
});

var hot = source.shareReplay(2);

expect(results1).toEqual([]);
expect(results2).toEqual([]);

hot.subscribe(function (x) {
results1.push(x);
});

expect(results1).toEqual([1, 2, 3, 4]);
expect(results2).toEqual([]);

hot.subscribe(function (x) {
results2.push(x);
});

expect(results1).toEqual([1, 2, 3, 4]);
expect(results2).toEqual([3, 4]);
expect(subscriptions).toBe(1);
done();
});

it('should not change the output of the observable when successful', function () {
var e1 = hot('---a--^--b-c--d--e--|');
var expected = '---b-c--d--e--|';

expectObservable(e1.shareReplay(1)).toBe(expected);
});

it('should not change the output of the observable when error', function () {
var e1 = hot('---a--^--b-c--d--e--#');
var expected = '---b-c--d--e--#';

expectObservable(e1.shareReplay(1)).toBe(expected);
});

it('should not change the output of the observable when never', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.shareReplay(1)).toBe(expected);
});

it('should not change the output of the observable when empty', function () {
var e1 = Observable.empty();
var expected = '|';

expectObservable(e1.shareReplay(1)).toBe(expected);
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export interface CoreOperators<T> {
sampleTime?: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;
scan?: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
share?: () => Observable<T>;
shareReplay?: (bufferSize: number, windowTime: number, scheduler?: Scheduler) => Observable<T>;
single?: (predicate?: (value: T, index:number) => boolean, thisArg?: any) => Observable<T>;
skip?: (count: number) => Observable<T>;
skipUntil?: (notifier: Observable<any>) => Observable<T>;
Expand Down
5 changes: 4 additions & 1 deletion src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Observable.zip = zipStatic



// Operators
// Operators
const observableProto = (<KitchenSinkOperators<any>>Observable.prototype);

import buffer from './operators/buffer';
Expand Down Expand Up @@ -227,6 +227,9 @@ observableProto.scan = scan;
import share from './operators/share';
observableProto.share = share;

import shareReplay from './operators/shareReplay';
observableProto.shareReplay = shareReplay;

import single from './operators/single';
observableProto.single = single;

Expand Down
5 changes: 4 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Observable.zip = zipStatic



// Operators
// Operators
import { CoreOperators } from './CoreOperators';
const observableProto = (<CoreOperators<any>>Observable.prototype);

Expand Down Expand Up @@ -203,6 +203,9 @@ observableProto.scan = scan;
import share from './operators/share';
observableProto.share = share;

import shareReplay from './operators/shareReplay';
observableProto.shareReplay = shareReplay;

import single from './operators/single';
observableProto.single = single;

Expand Down
9 changes: 9 additions & 0 deletions src/operators/shareReplay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import Observable from '../Observable';
import Scheduler from '../Scheduler';
import publishReplay from './publishReplay';

export default function shareReplay<T>(bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
scheduler?: Scheduler) {
return publishReplay.call(this, bufferSize, windowTime, scheduler).refCount();
}

0 comments on commit 65c84ea

Please sign in to comment.