Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the Observable spec #673

Merged
merged 19 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions lib/createObserver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
var isFunction = require('./isFunction');

/**
* Coerce observer callbacks into observer object or return observer object
* if already created. Will throw an error if both an object and callback args
* are provided.
*
* @id createObserver
* @param {function|object} onNext Function to receive new values or observer
* @param {function} [onError] Optional callback to receive errors.
* @param {function} [onComplete] Optional callback when stream completes
* @return {object} Observer object with next, error, and complete methods
* @private
*
* createObserver(
* function (x) { console.log(x); },
* function (err) { console.error(err); },
* function () { console.log('done'); }
* )
*
* createObserver(
* null,
* null,
* function () { console.log('done'); }
* )
*
* createObserver({
* next: function (x) { console.log(x); },
* error: function (err) { console.error(err); },
* complete: function () { console.log('done'); }
* })
*/
function createObserver (onNext, onError, onComplete) {
var isObserver = onNext && !isFunction(onNext) && typeof onNext === 'object';

// ensure if we have an observer that we don't also have callbacks. Users
// must choose one.
if (isObserver && (onError || onComplete)) {
throw new Error('Subscribe requires either an observer object or optional callbacks.');
}

// onNext is actually an observer
if (isObserver) {
return onNext;
}

// Otherwise create an observer object
return {
next: onNext,
error: onError,
complete: onComplete,
};
}

module.exports = createObserver;
22 changes: 22 additions & 0 deletions lib/global.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
jaidetree marked this conversation as resolved.
Show resolved Hide resolved
* Return a global context upon which to install Highland globals. Takes a
* default namespace to use if both the node global and browser window
* namespace cannot be found.
*
* @returns {object} Global namespace context
*/

// Use the nodejs global namespace
if (typeof global !== 'undefined') {
module.exports = global;
}
// Use the browser window namespace
else if (typeof window !== 'undefined') {
module.exports = window;
}
// If neither the global namespace or browser namespace is avaiable
// Use this module as the default context
else {
module.exports = this;
}

112 changes: 98 additions & 14 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var Decoder = require('string_decoder').StringDecoder;

var createObserver = require('./createObserver');
var isFunction = require('./isFunction');
var IntMap = require('./intMap');
var ObservableSubscription = require('./observableSubscription');
var Queue = require('./queue');
var ReadableProxy = require('./readableProxy');

Expand All @@ -21,13 +24,7 @@ var slice = Array.prototype.slice;
var hasOwn = Object.prototype.hasOwnProperty;

// Set up the global object.
var _global = this;
if (typeof global !== 'undefined') {
_global = global;
}
else if (typeof window !== 'undefined') {
_global = window;
}
var _global = require('./global');

// ES5 detected value, used for switch between ES5 and ES3 code
var isES5 = (function () {
Expand Down Expand Up @@ -317,9 +314,7 @@ _.isUndefined = function (x) {
return typeof x === 'undefined';
};

_.isFunction = function (x) {
return typeof x === 'function';
};
_.isFunction = isFunction;

_.isObject = function (x) {
return typeof x === 'object' && x !== null;
Expand Down Expand Up @@ -382,10 +377,7 @@ else {

// set up a global nil object in cases where you have multiple Highland
// instances installed (often via npm)
if (!_global.nil) {
_global.nil = {};
}
var nil = _.nil = _global.nil;
var nil = _.nil = require('./nil');

/**
* Transforms a function with specific arity (all arguments must be
Expand Down Expand Up @@ -2109,6 +2101,98 @@ addMethod('toPromise', function (PromiseCtor) {
});


/**
* Consumes values using the Observable subscribe signature. Unlike other
* consumption methods, subscribe can be called multiple times. Each
* subscription will receive the current value before receiving the next value.
* Subscribing to an already consumed stream will result in an error.
*
* Implements the Observable subscribe functionality as defined by the spec:
* https://tc39.github.io/proposal-observable/#observable-prototype-subscribe
*
* @id subscribe
* @section Consumption
* @name Stream.subscribe(onNext, onError, onComplete)
* @param {Function|object|null} onNext - Handler for next value or observer
* @param {Function|null} onError - Handler function for errors.
* @param {Function|null} onCompleted - Handler Function when stream is done.
* @returns {ObservableSubscription} - Subscription with unsubscribed method
* @api public
*
* // with callbacks
* _([1, 2, 3, 4]).subscribe(
* function onNext (x) {
* // Called for each value that comes downstream
* console.log('Received onNext value', x);
* },
* function onError (err) {
* // Called one time with error or zero if no errors occur upstream
* console.error('Single highland stream error', err);
* },
* function onComplete () {
* // Receives no arguments
* // Called only once when stream is completed.
* console.log('Completed!');
* }
* );
*
* // with an observer
* _([1, 2, 3, 4]).subscribe({
* next (x) {
* console.log('Received next value', x);
* },
* error (err) {
* console.error('An error occurred upstream', err);
* },
* complete () {
* console.log('Completed!')
* }
* });
*/

addMethod('subscribe', function (onNext, onError, onComplete) {
var observer = createObserver(onNext, onError, onComplete);

return new ObservableSubscription(this, observer);
});

/*
* Create a variable we can use as a dynamic method name depending on the
* environment.
*
* If Symbols are available get the observable symbol. Otherwise use the a
* fallback string.
* https://tc39.github.io/proposal-observable/#observable-prototype-@@observable
*
* Source taken from RxJS
* https://github.com/ReactiveX/rxjs/commit/4a5aaafc99825ae9b61e410bc0b5e86c7ae75837#diff-d26bc4881b94c82f3c0ae7d3914e9577R13
*/
/* eslint-disable no-undef */
var observable = typeof Symbol === 'function' && Symbol.observable || '@@observable';
/* eslint-enable no-undef */

/**
* Returns an Observable spec-compliant instance (itself) that has a subscribe
* method and a Symbol.observable method. If Symbol is not available in the
* current environment it defaults to '@@observable'. Used by other tools and
* libraries that want to get an observable spec compliant stream interface.
*
* https://tc39.github.io/proposal-observable/#observable-prototype-@@observable
*
* @id Symbol.observable
* @section Consumption
* @name Symbol.observable
* @api public
*
* _([1, 2, 3])[Symbol.observable || "@@observable"]().subscribe(x => {
* console.log("Received value", x);
* });
*/

addMethod(observable, function () {
return this;
});

/**
* Converts the stream to a node Readable Stream for use in methods
* or pipes that depend on the native stream type.
Expand Down
14 changes: 14 additions & 0 deletions lib/isFunction.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Predicate function takes a value and returns true if it is a function.
*
* @id isFunction
* @name isFunction(x)
* @param {any} x - Any value to test against
* @returns {bool} True if x is a function
*/

function isFunction (x) {
return typeof x === 'function';
}

module.exports = isFunction;
21 changes: 21 additions & 0 deletions lib/nil.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
var _global = require('./global');

/*
* Resolve nil value from global namespace if it exists. This may happen when
* there are multiple versions of highland (like npm).
*
* nil is only equal to itself:
*
* nil === {} => false
* nil === nil => true
*
* This property makes it valuable for determining a lack of input from a
* falsey value such as nil or undefined. When a highland stream encounters
* nil it knows for sure the intention is to end the stream.
*/

if (!_global.nil) {
_global.nil = {};
}

module.exports = _global.nil;
106 changes: 106 additions & 0 deletions lib/observableSubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
var nil = require('./nil');

/**
* An implementation of the TC39 Subscription object
* https://tc39.github.io/proposal-observable/#subscription-objects
*
* This class is intended for internal use only.
*
* Constructor takes a source highland stream, and an observer object with
* an optional next, error, and complete methods.
*
* Returns a subscription object with a closed boolean and unsubscribe
* method.
*
* @id ObservableSubscription
* @name ObservableSubscription
* @param {stream} stream - Highland stream to subscribe to
* @param {object} observer - Observer to publish from stream subscription
* @api private
*/
function ObservableSubscription (stream, observer) {
var self = this;

// Set attributes
this._source = stream.fork();
this.closed = false;
jaidetree marked this conversation as resolved.
Show resolved Hide resolved

// Don't let users subscribe to an already completed stream
if (stream.ended) {
if (observer.error) {
observer.error(new Error('Subscribe called on an already completed stream.'));
}

this._cleanup();

return;
}

// Consume the stream and emit data to the observer
this._source = this._source.consume(function (err, x, push, next) {
if (err) {
push(null, nil);
if (observer.error) {
observer.error(err);
}
self._cleanup();
}
else if (x === nil) {
if (observer.complete) {
observer.complete();
}
self._cleanup();
}
else {
if (observer.next) {
observer.next(x);
}
next();
}
});

this._source.resume();
}

// Instance Methods

/**
* Perform cleanup routine on a subscription. This can only be called once per
* subscription. Once its closed the subscription cannot be cleaned up again.
*
* Note: This relies heavily upon side-effects and mutates itself.
*
* @id ObservableSubscription.prototype._cleanup(subscription)
* @name ObservableSubscription.prototype._cleanup
* @returns {undefined} Side-effectful function cleans up subscription
* @api private
*/

ObservableSubscription.prototype._cleanup = function cleanup () {
// Don't want to destroy\cleanup an already closed stream
if (this.closed) {
return;
}
this._source = null;
this.closed = true;
};

/**
* Destroy the stream resources and cleanup the subscription.
* @id ObservableSubscription.prototype.unsubscribe()
* @name ObservableSubscription.prototype.unsubscribe()
* @returns {undefined} Side-effectful. Destroys stream and cleans up subscription.
* @api private
*/

ObservableSubscription.prototype.unsubscribe = function unsubscribe () {
// Don't want to destroy\cleanup an already closed stream
if (this.closed) {
return;
}

this._source.destroy();
this._cleanup();
};

module.exports = ObservableSubscription;
Loading