Skip to content

Commit

Permalink
feat(shareReplay): add config parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed Jan 7, 2019
1 parent 41f05ae commit 56bae3b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 12 deletions.
22 changes: 18 additions & 4 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ describe('shareReplay operator', () => {
expect(spy, 'ReplaySubject should not call scheduler.now() when no windowTime is given').to.be.not.called;
});

it('should not restart if refCount hits 0 due to unsubscriptions', () => {
it('should not restart due to unsubscriptions if refCount is false', () => {
const results: number[] = [];
const source = interval(10, rxTestScheduler).pipe(
take(10),
shareReplay(1)
shareReplay(1, undefined, { refCount: false })
);
const subs = source.subscribe(x => results.push(x));
rxTestScheduler.schedule(() => subs.unsubscribe(), 35);
Expand All @@ -187,11 +187,11 @@ describe('shareReplay operator', () => {
expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]);
});

it('should restart if refCount hits 0 due to unsubscriptions', () => {
it('should restart due to unsubscriptions if refCount is true', () => {
const results: number[] = [];
const source = interval(10, rxTestScheduler).pipe(
take(10),
shareReplay(1)
shareReplay(1, undefined, { refCount: true })
);
const subs = source.subscribe(x => results.push(x));
rxTestScheduler.schedule(() => subs.unsubscribe(), 35);
Expand All @@ -201,6 +201,20 @@ describe('shareReplay operator', () => {
expect(results).to.deep.equal([0, 1, 2, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
});

it('should default to refCount being false', () => {
const results: number[] = [];
const source = interval(10, rxTestScheduler).pipe(
take(10),
shareReplay(1)
);
const subs = source.subscribe(x => results.push(x));
rxTestScheduler.schedule(() => subs.unsubscribe(), 35);
rxTestScheduler.schedule(() => source.subscribe(x => results.push(x)), 54);

rxTestScheduler.flush();
expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]);
});

it('should not break lift() composability', (done: MochaDone) => {
class MyCustomObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
Expand Down
52 changes: 44 additions & 8 deletions src/internal/operators/shareReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ import { ReplaySubject } from '../ReplaySubject';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { Subscriber } from '../Subscriber';
import { isScheduler } from '../util/isScheduler';

export interface ShareReplayConfig {
refCount: boolean;
}

export const defaultShareReplayConfig: ShareReplayConfig = {
refCount: false
};

/**
* Share source and replay specified number of emissions on subscription.
Expand Down Expand Up @@ -39,25 +48,51 @@ import { Subscriber } from '../Subscriber';
*
* @param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer.
* @param {Number} [windowTime=Number.POSITIVE_INFINITY] Maximum time length of the replay buffer in milliseconds.
* @param {Object} config a configuration object to define `refCount` behavior. Defaults
* to `{ refCount: false }`.
* @param {Scheduler} [scheduler] Scheduler where connected observers within the selector function
* will be invoked on.
* @return {Observable} An observable sequence that contains the elements of a sequence produced
* by multicasting the source sequence within a selector function.
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(
bufferSize?: number,
windowTime?: number,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(
bufferSize?: number,
windowTime?: number,
config?: ShareReplayConfig,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(
bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
configOrScheduler?: ShareReplayConfig | SchedulerLike,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler));
let config = defaultShareReplayConfig;
if (isScheduler(configOrScheduler)) {
scheduler = configOrScheduler as SchedulerLike;
} else if (configOrScheduler) {
config = configOrScheduler as ShareReplayConfig;
}
return (source: Observable<T>) => source.lift(shareReplayOperator(bufferSize, windowTime, config, scheduler));
}

function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) {
let subject: ReplaySubject<T>;
function shareReplayOperator<T>(
bufferSize: number | undefined,
windowTime: number | undefined,
config: ShareReplayConfig,
scheduler: SchedulerLike | undefined
) {
const { refCount: useRefCount } = config;
let subject: ReplaySubject<T> | undefined;
let refCount = 0;
let subscription: Subscription;
let subscription: Subscription | undefined;
let hasError = false;
let isComplete = false;

Expand All @@ -80,13 +115,14 @@ function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, schedu
}

const innerSub = subject.subscribe(this);

return () => {
this.add(() => {
refCount--;
innerSub.unsubscribe();
if (subscription && refCount === 0 && isComplete) {
if (subscription && !isComplete && useRefCount && refCount === 0) {
subscription.unsubscribe();
subscription = undefined;
subject = undefined;
}
};
});
};
}

0 comments on commit 56bae3b

Please sign in to comment.