From 56bae3b47374a3f575da0ac5411d460bf141adcd Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Tue, 28 Aug 2018 12:03:43 +1000 Subject: [PATCH] feat(shareReplay): add config parameter Closes #3336 --- spec/operators/shareReplay-spec.ts | 22 +++++++++--- src/internal/operators/shareReplay.ts | 52 ++++++++++++++++++++++----- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index 7c787fd233..3688781e40 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -173,11 +173,11 @@ describe('shareReplay operator', () => { expect(spy, 'ReplaySubject should not call scheduler.now() when no windowTime is given').to.be.not.called; }); - it('should not restart if refCount hits 0 due to unsubscriptions', () => { + it('should not restart due to unsubscriptions if refCount is false', () => { const results: number[] = []; const source = interval(10, rxTestScheduler).pipe( take(10), - shareReplay(1) + shareReplay(1, undefined, { refCount: false }) ); const subs = source.subscribe(x => results.push(x)); rxTestScheduler.schedule(() => subs.unsubscribe(), 35); @@ -187,11 +187,11 @@ describe('shareReplay operator', () => { expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]); }); - it('should restart if refCount hits 0 due to unsubscriptions', () => { + it('should restart due to unsubscriptions if refCount is true', () => { const results: number[] = []; const source = interval(10, rxTestScheduler).pipe( take(10), - shareReplay(1) + shareReplay(1, undefined, { refCount: true }) ); const subs = source.subscribe(x => results.push(x)); rxTestScheduler.schedule(() => subs.unsubscribe(), 35); @@ -201,6 +201,20 @@ describe('shareReplay operator', () => { expect(results).to.deep.equal([0, 1, 2, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); }); + it('should default to refCount being false', () => { + const results: number[] = []; + const source = interval(10, rxTestScheduler).pipe( + take(10), + shareReplay(1) + ); + const subs = source.subscribe(x => results.push(x)); + rxTestScheduler.schedule(() => subs.unsubscribe(), 35); + rxTestScheduler.schedule(() => source.subscribe(x => results.push(x)), 54); + + rxTestScheduler.flush(); + expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]); + }); + it('should not break lift() composability', (done: MochaDone) => { class MyCustomObservable extends Observable { lift(operator: Operator): Observable { diff --git a/src/internal/operators/shareReplay.ts b/src/internal/operators/shareReplay.ts index 3d5371fea9..8863dd213e 100644 --- a/src/internal/operators/shareReplay.ts +++ b/src/internal/operators/shareReplay.ts @@ -3,6 +3,15 @@ import { ReplaySubject } from '../ReplaySubject'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; import { Subscriber } from '../Subscriber'; +import { isScheduler } from '../util/isScheduler'; + +export interface ShareReplayConfig { + refCount: boolean; +} + +export const defaultShareReplayConfig: ShareReplayConfig = { + refCount: false +}; /** * Share source and replay specified number of emissions on subscription. @@ -39,6 +48,8 @@ import { Subscriber } from '../Subscriber'; * * @param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer. * @param {Number} [windowTime=Number.POSITIVE_INFINITY] Maximum time length of the replay buffer in milliseconds. + * @param {Object} config a configuration object to define `refCount` behavior. Defaults + * to `{ refCount: false }`. * @param {Scheduler} [scheduler] Scheduler where connected observers within the selector function * will be invoked on. * @return {Observable} An observable sequence that contains the elements of a sequence produced @@ -46,18 +57,42 @@ import { Subscriber } from '../Subscriber'; * @method shareReplay * @owner Observable */ +export function shareReplay( + bufferSize?: number, + windowTime?: number, + scheduler?: SchedulerLike +): MonoTypeOperatorFunction; +export function shareReplay( + bufferSize?: number, + windowTime?: number, + config?: ShareReplayConfig, + scheduler?: SchedulerLike +): MonoTypeOperatorFunction; export function shareReplay( bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, + configOrScheduler?: ShareReplayConfig | SchedulerLike, scheduler?: SchedulerLike ): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler)); + let config = defaultShareReplayConfig; + if (isScheduler(configOrScheduler)) { + scheduler = configOrScheduler as SchedulerLike; + } else if (configOrScheduler) { + config = configOrScheduler as ShareReplayConfig; + } + return (source: Observable) => source.lift(shareReplayOperator(bufferSize, windowTime, config, scheduler)); } -function shareReplayOperator(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) { - let subject: ReplaySubject; +function shareReplayOperator( + bufferSize: number | undefined, + windowTime: number | undefined, + config: ShareReplayConfig, + scheduler: SchedulerLike | undefined +) { + const { refCount: useRefCount } = config; + let subject: ReplaySubject | undefined; let refCount = 0; - let subscription: Subscription; + let subscription: Subscription | undefined; let hasError = false; let isComplete = false; @@ -80,13 +115,14 @@ function shareReplayOperator(bufferSize?: number, windowTime?: number, schedu } const innerSub = subject.subscribe(this); - - return () => { + this.add(() => { refCount--; innerSub.unsubscribe(); - if (subscription && refCount === 0 && isComplete) { + if (subscription && !isComplete && useRefCount && refCount === 0) { subscription.unsubscribe(); + subscription = undefined; + subject = undefined; } - }; + }); }; }