diff --git a/src/datastores/mysql.js b/src/datastores/mysql.js index ea80c6d..8421013 100644 --- a/src/datastores/mysql.js +++ b/src/datastores/mysql.js @@ -22,6 +22,7 @@ const pRetry = require('p-retry') * @param {string} user * @param {string} password * @param {string} database + * @param {number} [connectionLimit = 20] * @param {boolean} [insecureAuth = true] * @param {boolean} [multipleStatements = true] */ @@ -35,12 +36,13 @@ class Mysql { * * @param {MySqlOptions} options */ - constructor ({ host, user, password, database, insecureAuth = true, multipleStatements = true }) { + constructor ({ host, user, password, database, connectionLimit = 20, insecureAuth = true, multipleStatements = true }) { this.options = { host, user, password, database, + connectionLimit, insecureAuth, multipleStatements } @@ -67,12 +69,12 @@ class Mysql { * Closes Database connection */ stop () { - this.conn.end() + this.pool.end() } async reset () { await new Promise((resolve, reject) => { - this.conn.query(` + this.pool.query(` DROP TABLE IF EXISTS cookie; DROP TABLE IF EXISTS registration; `, (err) => { @@ -91,7 +93,7 @@ class Mysql { */ gc () { return new Promise((resolve, reject) => { - this.conn.query('DELETE FROM registration WHERE expiration <= NOW()', + this.pool.query('DELETE FROM registration WHERE expiration <= UNIX_TIMESTAMP(NOW())', (err, res) => { if (err) { return reject(err) @@ -119,12 +121,12 @@ class Mysql { this._registeringPeer.set(id, peerOps) return new Promise((resolve, reject) => { - this.conn.query('INSERT INTO ?? SET ?', + this.pool.query('INSERT INTO ?? SET ?', ['registration', { namespace, peer_id: id, signed_peer_record: Buffer.from(signedPeerRecord), - expiration: new Date(Date.now() + ttl) + expiration: (Date.now() + ttl) / 1000 // Epoch in seconds like MySQL }], (err) => { // Remove Operation peerOps.delete(opId) @@ -153,7 +155,7 @@ class Mysql { async getRegistrations (namespace, { limit = 10, cookie } = {}) { if (cookie) { const cookieEntries = await new Promise((resolve, reject) => { - this.conn.query( + this.pool.query( 'SELECT * FROM cookie WHERE id = ? LIMIT 1', [cookie], (err, results) => { @@ -179,9 +181,9 @@ class Mysql { } const results = await new Promise((resolve, reject) => { - this.conn.query( + this.pool.query( `SELECT id, namespace, peer_id, signed_peer_record, expiration FROM registration r - WHERE namespace = ? AND expiration >= NOW() ${cookieWhereNotExists()} + WHERE namespace = ? AND expiration >= UNIX_TIMESTAMP(NOW()) ${cookieWhereNotExists()} ORDER BY expiration DESC LIMIT ?`, [namespace, cookie || limit, limit], @@ -205,14 +207,15 @@ class Mysql { // Store in cookies if results available await new Promise((resolve, reject) => { - this.conn.query( + this.pool.query( `INSERT INTO ?? (id, namespace, reg_id) VALUES ${results.map((entry) => - `(${this.conn.escape(cookie)}, ${this.conn.escape(entry.namespace)}, ${this.conn.escape(entry.id)})` + `(${this.pool.escape(cookie)}, ${this.pool.escape(entry.namespace)}, ${this.pool.escape(entry.id)})` )}`, ['cookie'] , (err) => { if (err) { return reject(err) } + // @ts-ignore resolve() }) }) @@ -238,7 +241,7 @@ class Mysql { const id = peerId.toB58String() return new Promise((resolve, reject) => { - this.conn.query('SELECT COUNT(1) FROM registration WHERE peer_id = ?', + this.pool.query('SELECT COUNT(1) FROM registration WHERE peer_id = ?', [id], (err, res) => { if (err) { @@ -275,7 +278,7 @@ class Mysql { const id = peerId.toB58String() return new Promise((resolve, reject) => { - this.conn.query('DELETE FROM registration WHERE peer_id = ? AND namespace = ?', [id, ns], + this.pool.query('DELETE FROM registration WHERE peer_id = ? AND namespace = ?', [id, ns], (err, res) => { if (err) { return reject(err) @@ -295,7 +298,7 @@ class Mysql { const id = peerId.toB58String() return new Promise((resolve, reject) => { - this.conn.query('DELETE FROM registration WHERE peer_id = ?', [id], + this.pool.query('DELETE FROM registration WHERE peer_id = ?', [id], (err, res) => { if (err) { return reject(err) @@ -311,16 +314,16 @@ class Mysql { * @returns {Promise} */ _initDB () { - this.conn = mysql.createConnection(this.options) + this.pool = mysql.createPool(this.options) return new Promise((resolve, reject) => { - this.conn.query(` + this.pool.query(` CREATE TABLE IF NOT EXISTS registration ( id INT UNSIGNED NOT NULL AUTO_INCREMENT, namespace varchar(255) NOT NULL, peer_id varchar(255) NOT NULL, signed_peer_record blob NOT NULL, - expiration timestamp NOT NULL, + expiration BIGINT NOT NULL, PRIMARY KEY (id), INDEX (namespace, expiration, peer_id) ); diff --git a/test/server.spec.js b/test/server.spec.js index b328cf9..680856b 100644 --- a/test/server.spec.js +++ b/test/server.spec.js @@ -329,9 +329,6 @@ describe('rendezvous server', () => { // wait for firt record to be removed (2nd gc) await pWaitFor(() => spy.callCount >= 2) - r = await rServer.getRegistrations(testNamespace) - expect(r.registrations).to.have.lengthOf(1) - // wait for second record to be removed await pRetry(async () => { r = await rServer.getRegistrations(testNamespace)