Skip to content

Commit

Permalink
[feature] Add support for Blob
Browse files Browse the repository at this point in the history
Closes #2206
  • Loading branch information
lpinca committed Jun 11, 2024
1 parent ddfe4a8 commit b90a27d
Show file tree
Hide file tree
Showing 10 changed files with 755 additions and 89 deletions.
9 changes: 6 additions & 3 deletions doc/ws.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ is a noop if the ready state is `CONNECTING` or `CLOSED`.

### websocket.ping([data[, mask]][, callback])

- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
- `data`
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
data to send in the ping frame.
- `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to
`true` when `websocket` is not a server client.
Expand All @@ -550,7 +551,8 @@ Send a ping. This method throws an error if the ready state is `CONNECTING`.

### websocket.pong([data[, mask]][, callback])

- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
- `data`
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
data to send in the pong frame.
- `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to
`true` when `websocket` is not a server client.
Expand Down Expand Up @@ -588,7 +590,8 @@ only removes listeners added with

### websocket.send(data[, options][, callback])

- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
- `data`
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
data to send. `Object` values are only supported if they conform to the
requirements of [`Buffer.from()`][]. If those constraints are not met, a
`TypeError` is thrown.
Expand Down
8 changes: 7 additions & 1 deletion lib/constants.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
'use strict';

const BINARY_TYPES = ['nodebuffer', 'arraybuffer', 'fragments'];
const hasBlob = typeof Blob !== 'undefined';

if (hasBlob) BINARY_TYPES.push('blob');

module.exports = {
BINARY_TYPES: ['nodebuffer', 'arraybuffer', 'fragments'],
BINARY_TYPES,
EMPTY_BUFFER: Buffer.alloc(0),
GUID: '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
hasBlob,
kForOnEventAttribute: Symbol('kIsForOnEventAttribute'),
kListener: Symbol('kListener'),
kStatusCode: Symbol('status-code'),
Expand Down
2 changes: 2 additions & 0 deletions lib/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,8 @@ class Receiver extends Writable {
data = concat(fragments, messageLength);
} else if (this._binaryType === 'arraybuffer') {
data = toArrayBuffer(concat(fragments, messageLength));
} else if (this._binaryType === 'blob') {
data = new Blob(fragments);
} else {
data = fragments;
}
Expand Down
191 changes: 145 additions & 46 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ const { Duplex } = require('stream');
const { randomFillSync } = require('crypto');

const PerMessageDeflate = require('./permessage-deflate');
const { EMPTY_BUFFER } = require('./constants');
const { isValidStatusCode } = require('./validation');
const { EMPTY_BUFFER, kWebSocket, NOOP } = require('./constants');
const { isBlob, isValidStatusCode } = require('./validation');
const { mask: applyMask, toBuffer } = require('./buffer-util');

const kByteLength = Symbol('kByteLength');
Expand All @@ -16,6 +16,10 @@ const RANDOM_POOL_SIZE = 8 * 1024;
let randomPool;
let randomPoolPointer = RANDOM_POOL_SIZE;

const DEFAULT = 0;
const DEFLATING = 1;
const GET_BLOB_DATA = 2;

/**
* HyBi Sender implementation.
*/
Expand All @@ -42,8 +46,10 @@ class Sender {
this._compress = false;

this._bufferedBytes = 0;
this._deflating = false;
this._queue = [];
this._state = DEFAULT;
this.onerror = NOOP;
this[kWebSocket] = undefined;
}

/**
Expand Down Expand Up @@ -205,7 +211,7 @@ class Sender {
rsv1: false
};

if (this._deflating) {
if (this._state !== DEFAULT) {
this.enqueue([this.dispatch, buf, false, options, cb]);
} else {
this.sendFrame(Sender.frame(buf, options), cb);
Expand All @@ -227,6 +233,9 @@ class Sender {
if (typeof data === 'string') {
byteLength = Buffer.byteLength(data);
readOnly = false;
} else if (isBlob(data)) {
byteLength = data.size;
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
Expand All @@ -248,7 +257,13 @@ class Sender {
rsv1: false
};

if (this._deflating) {
if (isBlob(data)) {
if (this._state !== DEFAULT) {
this.enqueue([this.getBlobData, data, false, options, cb]);
} else {
this.getBlobData(data, false, options, cb);
}
} else if (this._state !== DEFAULT) {
this.enqueue([this.dispatch, data, false, options, cb]);
} else {
this.sendFrame(Sender.frame(data, options), cb);
Expand All @@ -270,6 +285,9 @@ class Sender {
if (typeof data === 'string') {
byteLength = Buffer.byteLength(data);
readOnly = false;
} else if (isBlob(data)) {
byteLength = data.size;
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
Expand All @@ -291,7 +309,13 @@ class Sender {
rsv1: false
};

if (this._deflating) {
if (isBlob(data)) {
if (this._state !== DEFAULT) {
this.enqueue([this.getBlobData, data, false, options, cb]);
} else {
this.getBlobData(data, false, options, cb);
}
} else if (this._state !== DEFAULT) {
this.enqueue([this.dispatch, data, false, options, cb]);
} else {
this.sendFrame(Sender.frame(data, options), cb);
Expand Down Expand Up @@ -325,6 +349,9 @@ class Sender {
if (typeof data === 'string') {
byteLength = Buffer.byteLength(data);
readOnly = false;
} else if (isBlob(data)) {
byteLength = data.size;
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
Expand Down Expand Up @@ -352,40 +379,107 @@ class Sender {

if (options.fin) this._firstFragment = true;

if (perMessageDeflate) {
const opts = {
[kByteLength]: byteLength,
fin: options.fin,
generateMask: this._generateMask,
mask: options.mask,
maskBuffer: this._maskBuffer,
opcode,
readOnly,
rsv1
};

if (this._deflating) {
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
const opts = {
[kByteLength]: byteLength,
fin: options.fin,
generateMask: this._generateMask,
mask: options.mask,
maskBuffer: this._maskBuffer,
opcode,
readOnly,
rsv1
};

if (isBlob(data)) {
if (this._state !== DEFAULT) {
this.enqueue([this.getBlobData, data, this._compress, opts, cb]);
} else {
this.dispatch(data, this._compress, opts, cb);
this.getBlobData(data, this._compress, opts, cb);
}
} else if (this._state !== DEFAULT) {
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
} else {
this.sendFrame(
Sender.frame(data, {
[kByteLength]: byteLength,
fin: options.fin,
generateMask: this._generateMask,
mask: options.mask,
maskBuffer: this._maskBuffer,
opcode,
readOnly,
rsv1: false
}),
cb
);
this.dispatch(data, this._compress, opts, cb);
}
}

/**
* Calls queued callbacks with an error.
*
* @param {Error} err The error to call the callbacks with
* @param {Function} [cb] The first callback
* @private
*/
callCallbacks(err, cb) {
if (typeof cb === 'function') cb(err);

for (let i = 0; i < this._queue.length; i++) {
const params = this._queue[i];
const callback = params[params.length - 1];

if (typeof callback === 'function') callback(err);
}
}

/**
* Gets the contents of a blob as binary data.
*
* @param {Blob} blob The blob
* @param {Boolean} [compress=false] Specifies whether or not to compress
* the data
* @param {Object} options Options object
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
* FIN bit
* @param {Function} [options.generateMask] The function used to generate the
* masking key
* @param {Boolean} [options.mask=false] Specifies whether or not to mask
* `data`
* @param {Buffer} [options.maskBuffer] The buffer used to store the masking
* key
* @param {Number} options.opcode The opcode
* @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
* modified
* @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
* RSV1 bit
* @param {Function} [cb] Callback
* @private
*/
getBlobData(blob, compress, options, cb) {
this._bufferedBytes += options[kByteLength];
this._state = GET_BLOB_DATA;

blob
.arrayBuffer()
.then((arrayBuffer) => {
if (this._socket.destroyed) {
const err = new Error(
'The socket was closed while the blob was being read'
);

this.callCallbacks(err, cb);
return;
}

this._bufferedBytes -= options[kByteLength];
const data = toBuffer(arrayBuffer);

if (!compress) {
this._state = DEFAULT;
this.sendFrame(Sender.frame(data, options), cb);
this.dequeue();
} else {
this.dispatch(data, compress, options, cb);
}
})
.catch((err) => {
//
// `onError` is called in the next tick to not suppress the throwing
// behavior of the `'error'` event emitted by the `WebSocket` object.
//
process.nextTick(onError, this, err, cb);
});
}

/**
* Dispatches a message.
*
Expand Down Expand Up @@ -418,27 +512,19 @@ class Sender {
const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];

this._bufferedBytes += options[kByteLength];
this._deflating = true;
this._state = DEFLATING;
perMessageDeflate.compress(data, options.fin, (_, buf) => {
if (this._socket.destroyed) {
const err = new Error(
'The socket was closed while data was being compressed'
);

if (typeof cb === 'function') cb(err);

for (let i = 0; i < this._queue.length; i++) {
const params = this._queue[i];
const callback = params[params.length - 1];

if (typeof callback === 'function') callback(err);
}

this.callCallbacks(err, cb);
return;
}

this._bufferedBytes -= options[kByteLength];
this._deflating = false;
this._state = DEFAULT;
options.readOnly = false;
this.sendFrame(Sender.frame(buf, options), cb);
this.dequeue();
Expand All @@ -451,7 +537,7 @@ class Sender {
* @private
*/
dequeue() {
while (!this._deflating && this._queue.length) {
while (this._state === DEFAULT && this._queue.length) {
const params = this._queue.shift();

this._bufferedBytes -= params[3][kByteLength];
Expand Down Expand Up @@ -490,3 +576,16 @@ class Sender {
}

module.exports = Sender;

/**
* Handles a `Sender` error.
*
* @param {Sender} sender The `Sender` instance
* @param {Error} err The error
* @param {Function} [cb] The first pending callback
* @private
*/
function onError(sender, err, cb) {
sender.callCallbacks(err, cb);
sender.onerror(err);
}
21 changes: 21 additions & 0 deletions lib/validation.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const { isUtf8 } = require('buffer');

const { hasBlob } = require('./constants');

//
// Allowed token characters:
//
Expand Down Expand Up @@ -107,7 +109,26 @@ function _isValidUTF8(buf) {
return true;
}

/**
* Determines whether a value is a `Blob`.
*
* @param {*} value The value to be tested
* @return {Boolean} `true` if `value` is a `Blob`, else `false`
* @private
*/
function isBlob(value) {
return (
hasBlob &&
typeof value === 'object' &&
typeof value.arrayBuffer === 'function' &&
typeof value.type === 'string' &&
typeof value.stream === 'function' &&
value[Symbol.toStringTag] === 'Blob'
);
}

module.exports = {
isBlob,
isValidStatusCode,
isValidUTF8: _isValidUTF8,
tokenChars
Expand Down
Loading

0 comments on commit b90a27d

Please sign in to comment.