From 65c84ea2ed1d64dbcf81a2b98943ec07ef8f8001 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Tue, 6 Oct 2015 10:03:53 +0300 Subject: [PATCH] feat(shareReplay): add the shareReplay() operator Add shareReplay() as a core operator. Vaguely relates to issue #439 and issue #298. --- .../operators/share-replay.js | 20 +++++ spec/operators/shareReplay-spec.js | 85 +++++++++++++++++++ src/CoreOperators.ts | 1 + src/Rx.KitchenSink.ts | 5 +- src/Rx.ts | 5 +- src/operators/shareReplay.ts | 9 ++ 6 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 perf/micro/immediate-scheduler/operators/share-replay.js create mode 100644 spec/operators/shareReplay-spec.js create mode 100644 src/operators/shareReplay.ts diff --git a/perf/micro/immediate-scheduler/operators/share-replay.js b/perf/micro/immediate-scheduler/operators/share-replay.js new file mode 100644 index 0000000000..3d9fddb07e --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/share-replay.js @@ -0,0 +1,20 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldShareReplayWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate) + .shareReplay(3); + var newShareReplayWithImmediateScheduler = RxNew.Observable.range(0, 25) + .shareReplay(3); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old shareReplay with immediate scheduler', function () { + oldShareReplayWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new shareReplay with immediate scheduler', function () { + newShareReplayWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/spec/operators/shareReplay-spec.js b/spec/operators/shareReplay-spec.js new file mode 100644 index 0000000000..42bbdeb4c7 --- /dev/null +++ b/spec/operators/shareReplay-spec.js @@ -0,0 +1,85 @@ +/* globals describe, expect, it, hot, cold, expectObservable */ + +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.shareReplay()', function () { + it('should share a single subscription', function () { + var subscriptionCount = 0; + var obs = new Observable(function (observer) { + subscriptionCount++; + }); + + var source = obs.shareReplay(1); + + expect(subscriptionCount).toBe(0); + + source.subscribe(); + source.subscribe(); + + expect(subscriptionCount).toBe(1); + }); + + it('should replay as many events as specified by the bufferSize', function (done) { + var results1 = []; + var results2 = []; + var subscriptions = 0; + + var source = new Observable(function (observer) { + subscriptions++; + observer.next(1); + observer.next(2); + observer.next(3); + observer.next(4); + }); + + var hot = source.shareReplay(2); + + expect(results1).toEqual([]); + expect(results2).toEqual([]); + + hot.subscribe(function (x) { + results1.push(x); + }); + + expect(results1).toEqual([1, 2, 3, 4]); + expect(results2).toEqual([]); + + hot.subscribe(function (x) { + results2.push(x); + }); + + expect(results1).toEqual([1, 2, 3, 4]); + expect(results2).toEqual([3, 4]); + expect(subscriptions).toBe(1); + done(); + }); + + it('should not change the output of the observable when successful', function () { + var e1 = hot('---a--^--b-c--d--e--|'); + var expected = '---b-c--d--e--|'; + + expectObservable(e1.shareReplay(1)).toBe(expected); + }); + + it('should not change the output of the observable when error', function () { + var e1 = hot('---a--^--b-c--d--e--#'); + var expected = '---b-c--d--e--#'; + + expectObservable(e1.shareReplay(1)).toBe(expected); + }); + + it('should not change the output of the observable when never', function () { + var e1 = Observable.never(); + var expected = '-'; + + expectObservable(e1.shareReplay(1)).toBe(expected); + }); + + it('should not change the output of the observable when empty', function () { + var e1 = Observable.empty(); + var expected = '|'; + + expectObservable(e1.shareReplay(1)).toBe(expected); + }); +}); diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 74376cd2b5..db2feaeb28 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -54,6 +54,7 @@ export interface CoreOperators { sampleTime?: (delay: number, scheduler?: Scheduler) => Observable; scan?: (project: (acc: R, x: T) => R, acc?: R) => Observable; share?: () => Observable; + shareReplay?: (bufferSize: number, windowTime: number, scheduler?: Scheduler) => Observable; single?: (predicate?: (value: T, index:number) => boolean, thisArg?: any) => Observable; skip?: (count: number) => Observable; skipUntil?: (notifier: Observable) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 26a6860bc9..5d9469ee02 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -66,7 +66,7 @@ Observable.zip = zipStatic -// Operators +// Operators const observableProto = (>Observable.prototype); import buffer from './operators/buffer'; @@ -227,6 +227,9 @@ observableProto.scan = scan; import share from './operators/share'; observableProto.share = share; +import shareReplay from './operators/shareReplay'; +observableProto.shareReplay = shareReplay; + import single from './operators/single'; observableProto.single = single; diff --git a/src/Rx.ts b/src/Rx.ts index d9ce5a24ad..4ca4c6a2ee 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -56,7 +56,7 @@ Observable.zip = zipStatic -// Operators +// Operators import { CoreOperators } from './CoreOperators'; const observableProto = (>Observable.prototype); @@ -203,6 +203,9 @@ observableProto.scan = scan; import share from './operators/share'; observableProto.share = share; +import shareReplay from './operators/shareReplay'; +observableProto.shareReplay = shareReplay; + import single from './operators/single'; observableProto.single = single; diff --git a/src/operators/shareReplay.ts b/src/operators/shareReplay.ts new file mode 100644 index 0000000000..d00327ea56 --- /dev/null +++ b/src/operators/shareReplay.ts @@ -0,0 +1,9 @@ +import Observable from '../Observable'; +import Scheduler from '../Scheduler'; +import publishReplay from './publishReplay'; + +export default function shareReplay(bufferSize: number = Number.POSITIVE_INFINITY, + windowTime: number = Number.POSITIVE_INFINITY, + scheduler?: Scheduler) { + return publishReplay.call(this, bufferSize, windowTime, scheduler).refCount(); +}