Skip to content

Commit

Permalink
fix: Fixed error removing already removed listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
Rapsssito committed Feb 18, 2020
1 parent bf05e66 commit 7712876
Showing 1 changed file with 72 additions and 14 deletions.
86 changes: 72 additions & 14 deletions src/TcpSocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,39 @@ const STATE = {
CONNECTED: 2,
};

class RemovableListener {
/**
* @param {import("react-native").EmitterSubscription} listener
* @param {import("react-native").NativeEventEmitter} eventEmitter
*/
constructor(listener, eventEmitter) {
this._listener = listener;
this._eventEmitter = eventEmitter;
this._removed = false;
}

isRemoved() {
return this._removed;
}

remove() {
this._eventEmitter.removeSubscription(this._listener);
this._removed = true;
}
}

export default class TcpSocket {
/**
* Initialices a TcpSocket.
*
* @param {Number} id
* @param {number} id
* @param {import('react-native').NativeEventEmitter} eventEmitter
*/
constructor(id, eventEmitter) {
this._id = id;
this._eventEmitter = eventEmitter;
this._state = STATE.DISCONNECTED;
/** @type {RemovableListener[]} */
this._listeners = [];
}

Expand All @@ -35,10 +57,16 @@ export default class TcpSocket {
*/
on(event, callback, context) {
const newListener = this._selectListener(event, callback, context);
this._listeners.push(newListener);
return newListener;
const removableListener = new RemovableListener(newListener, this._eventEmitter);
this._listeners.push(removableListener);
return removableListener;
}

/**
* @param {string} event
* @param {function(any):void} callback
* @param {any} [context]
*/
_selectListener(event, callback, context) {
switch (event) {
case 'data':
Expand Down Expand Up @@ -78,6 +106,10 @@ export default class TcpSocket {
);
}

/**
* @param {{ host: string; port: number; timeout: number; }} options
* @param {(address: string) => void} [callback]
*/
connect(options, callback) {
this._registerEvents();
// Normalize args
Expand All @@ -95,6 +127,10 @@ export default class TcpSocket {
return this;
}

/**
* @param {number} msecs
* @param {() => void} [wrapper]
*/
_activeTimer(msecs, wrapper) {
if (this._timeout && this._timeout.handle) clearTimeout(this._timeout.handle);

Expand All @@ -120,12 +156,16 @@ export default class TcpSocket {
}
}

/**
* @param {number} msecs
* @param {{ (...args: any[]): any; (...args: any[]): any; }} [callback]
*/
setTimeout(msecs, callback) {
if (msecs === 0) {
this._clearTimeout();
if (callback) this._eventEmitter.removeListener('timeout', callback);
} else {
if (callback) this._eventEmitter.once('timeout', callback);
if (callback) this._eventEmitter.once('timeout', callback, this);

this._activeTimer(msecs);
}
Expand All @@ -136,6 +176,10 @@ export default class TcpSocket {
return this._address;
}

/**
* @param {string | Buffer | Uint8Array} data
* @param {BufferEncoding} [encoding]
*/
end(data, encoding) {
if (this._destroyed) return;
if (data) this.write(data, encoding);
Expand All @@ -153,14 +197,18 @@ export default class TcpSocket {

_registerEvents() {
this.on('connect', (ev) => this._onConnect(ev.address));
this.on('close', (ev) => this._onClose(ev.hadError));
this.on('error', (ev) => this._onError(ev.error));
this.on('close', () => this._onClose());
this.on('error', () => this._onError());
}

_unregisterEvents() {
this._listeners.forEach((listener) => listener.remove());
this._listeners.forEach((listener) => (listener.isRemoved() ? listener.remove() : null));
this._listeners = [];
}

/**
* @param {string} address
*/
_onConnect(address) {
this.setConnected(address);
}
Expand All @@ -176,8 +224,8 @@ export default class TcpSocket {
/**
*
* @param {string | Buffer | Uint8Array} buffer
* @param {string} encoding
* @param {Function} callback
* @param {BufferEncoding} [encoding]
* @param {(error?: string) => void} [callback]
*/
write(buffer, encoding, callback) {
const self = this;
Expand All @@ -193,13 +241,23 @@ export default class TcpSocket {
`Invalid data, chunk must be a string or buffer, not ${typeof buffer}`
);

Sockets.write(this._id, str, function(err) {
if (self._timeout) self._activeTimer(self._timeout.msecs);
if (err) return callback(err);
callback();
});
Sockets.write(
this._id,
str,
/**
* @param {string} err
*/
function(err) {
if (self._timeout) self._activeTimer(self._timeout.msecs);
if (err) return callback(err);
callback();
}
);
}

/**
* @param {string} address
*/
setConnected(address) {
this._state = STATE.CONNECTED;
this._address = address;
Expand Down

0 comments on commit 7712876

Please sign in to comment.