Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alkatrivedi committed Oct 1, 2024
1 parent c030404 commit d2f7502
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 37 deletions.
3 changes: 3 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ import {
setSpanError,
setSpanErrorAndException,
} from './instrument';
import { GetSession, GetSessionInterface } from './session-getter';

Check failure on line 118 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·GetSession,·GetSessionInterface·` with `GetSession,·GetSessionInterface`

export type GetDatabaseRolesCallback = RequestCallback<
IDatabaseRole,
Expand Down Expand Up @@ -348,6 +349,7 @@ class Database extends common.GrpcServiceObject {
formattedName_: string;
pool_: SessionPoolInterface;
multiplexedSession_?: MultiplexedSessionInterface;
getSession_?: GetSessionInterface;
queryOptions_?: spannerClient.spanner.v1.ExecuteSqlRequest.IQueryOptions;
resourceHeader_: {[k: string]: string};
request: DatabaseRequest;
Expand Down Expand Up @@ -468,6 +470,7 @@ class Database extends common.GrpcServiceObject {
typeof multiplexedSessionOptions === 'function'
? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(this)
: new MultiplexedSession(this, multiplexedSessionOptions);
this.getSession_ = new GetSession(this);
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
}
Expand Down
48 changes: 12 additions & 36 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import {EventEmitter} from 'events';
import * as is from 'is';
import PQueue from 'p-queue';
import { GoogleError } from 'google-gax';

Check failure on line 3 in src/multiplexed-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·GoogleError·` with `GoogleError`
import {google} from '../protos/protos';
import {Database} from './database';
import {Session} from './session';
import {Transaction} from './transaction';
import {NormalCallback} from './common';

interface MultiplexedSessionInventory {
multiplexedSession: Session | null;
}
Expand Down Expand Up @@ -40,15 +36,14 @@ export interface MultiplexedSessionOptions {
}

const DEFAULTS: MultiplexedSessionOptions = {
refreshRate: 10,
refreshRate: 30,
concurrency: Infinity,
databaseRole: null,
};

export class MultiplexedSession extends EventEmitter implements MultiplexedSessionInterface {

Check failure on line 44 in src/multiplexed-session.ts

View workflow job for this annotation

GitHub Actions / lint

Replace `·extends·EventEmitter·implements·MultiplexedSessionInterface·` with `⏎··extends·EventEmitter⏎··implements·MultiplexedSessionInterface⏎`
database: Database;
multiplexedSessionOptions: MultiplexedSessionOptions;
multiplexedSessionPromise: Promise<Session> | undefined;
_acquires: PQueue;
_multiplexedInventory!: MultiplexedSessionInventory;
_pingHandle!: NodeJS.Timer;
Expand All @@ -67,7 +62,6 @@ export class MultiplexedSession extends EventEmitter implements MultiplexedSessi
this._multiplexedInventory = {
multiplexedSession: null,
};
this.multiplexedSessionPromise = undefined;
this._acquires = new PQueue({
concurrency: 1,
});
Expand Down Expand Up @@ -96,31 +90,12 @@ export class MultiplexedSession extends EventEmitter implements MultiplexedSessi
}

async _refresh(): Promise<void> {
this._multiplexedInventory.multiplexedSession?.getMetadata((err, resp) => {
if (err) {
console.error('Error fetching metadata:', err);
return;
}

if (resp?.createTime) {
const seconds = Number(resp.createTime.seconds) ?? 0;
const nanos = resp.createTime.nanos ?? 0;
if (typeof seconds === 'number' && typeof nanos === 'number') {
const timestampMs = seconds * 1000 + nanos / 1000000;
const createdDate = new Date(timestampMs);
const currentDate = new Date(Date.now());
const differenceInMs = Math.abs(
currentDate.getTime() - createdDate.getTime()
);
const differenceInDays = differenceInMs / (1000 * 60 * 60 * 24);
if (differenceInDays >= 7) {
this.createSession();
}
} else {
console.warn('Invalid timestamp values.');
}
}
});
const metadata = await this._multiplexedInventory.multiplexedSession?.getMetadata();

Check failure on line 93 in src/multiplexed-session.ts

View workflow job for this annotation

GitHub Actions / lint

Insert `⏎·····`
const createTime = (parseInt(metadata![0].createTime.seconds) * 1000) + (metadata![0].createTime.nanos / 1000000);
const expireTime = createTime + (7*24*60*60*1000);
if(Date.now() > expireTime) {
this.createSession();
}
}

/**
Expand All @@ -132,7 +107,9 @@ export class MultiplexedSession extends EventEmitter implements MultiplexedSessi
this._acquire().then(
session => callback(null, session, session?.txn),
callback
);
).catch(err => {
console.log("err: ", err);
})
}

async _acquire(): Promise<Session | null> {
Expand Down Expand Up @@ -210,10 +187,9 @@ export class MultiplexedSession extends EventEmitter implements MultiplexedSessi
);

try {
const res = await Promise.race(promises);
console.log("the response is: ", res);
await Promise.race(promises);
} catch(err) {
console.log("try is catching error");
console.log("ERROR: ",err);
} finally {
removeOnceCloseListener!();
removeListener!();
Expand Down
58 changes: 58 additions & 0 deletions src/session-getter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { EventEmitter } from "events-intercept";
import { Database, Session, Transaction } from ".";
import { MultiplexedSession, MultiplexedSessionInterface, MultiplexedSessionOptions } from "./multiplexed-session";
import { SessionPool, SessionPoolInterface, SessionPoolOptions } from "./session-pool";
import { MultiplexedSessionConstructor, SessionPoolConstructor } from "./database";

/**
* @callback GetSessionCallback
* @param {?Error} error Request error, if any.
* @param {Session} session The read-write session.
* @param {Transaction} transaction The transaction object.
*/
export interface GetSessionCallback {
(
err: Error | null,
session?: Session | null,
transaction?: Transaction | null
): void;
}

export interface GetSessionInterface {
getSession(callback: GetSessionCallback): void;
}

export class GetSession extends EventEmitter implements GetSessionInterface {
database: Database;
multiplexedSession_?: MultiplexedSessionInterface;
pool_: SessionPoolInterface;
constructor(
database: Database,
poolOptions?: SessionPoolConstructor | SessionPoolOptions,
multiplexedSessionOptions?: MultiplexedSessionOptions | MultiplexedSessionConstructor
) {
super();
this.database = database;
this.pool_ =
typeof poolOptions === 'function'
? new (poolOptions as SessionPoolConstructor)(this.database, null)
: new SessionPool(this.database, poolOptions);
this.multiplexedSession_ =
typeof multiplexedSessionOptions === 'function'
? new (multiplexedSessionOptions as MultiplexedSessionConstructor)(this.database)
: new MultiplexedSession(this.database, multiplexedSessionOptions);
}
getSession(callback: GetSessionCallback): void{
if(process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS==='true') {
this.multiplexedSession_?.getSession((err, session) => {
console.log("err: ", err);
err ? callback(err, null) : callback(null, session);
})
} else {
this.pool_?.getSession((err, session) => {
console.log("err: ", err);
err ? callback(err, null) : callback(null, session);
})
}
}
}
1 change: 0 additions & 1 deletion src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ export class Session extends common.GrpcServiceObject {
promisifyAll(Session, {
exclude: [
'delete',
'getMetadata',
'partitionedDml',
'snapshot',
'transaction',
Expand Down
14 changes: 14 additions & 0 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2179,6 +2179,20 @@ describe('Database', () => {
database.createSession(databaseRole, assert.ifError);
});

it('should send multiplexed correctly', done => {
const multiplexed = {multiplexed: true};
const options = {a: 'b'};
const originalOptions = extend(true, {}, options);

database.request = config => {
assert.deepStrictEqual(config.reqOpts.session.multiplexed, multiplexed.multiplexed);
assert.deepStrictEqual(options, originalOptions);
done();
};

database.createSession(multiplexed, assert.ifError);
});

describe('error', () => {
const ERROR = new Error('Error.');
const API_RESPONSE = {};
Expand Down
128 changes: 128 additions & 0 deletions test/multiplexed-session.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*!
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as assert from 'assert';
import {before, beforeEach, afterEach, describe, it} from 'mocha';
import * as events from 'events';
import * as extend from 'extend';
import PQueue from 'p-queue';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import stackTrace = require('stack-trace');
import timeSpan = require('time-span');

import {Database} from '../src/database';
import {Session} from '../src/session';
import * as mux from '../src/multiplexed-session';
import {Transaction} from '../src/transaction';
import {grpc} from 'google-gax';

const pQueueOverride: typeof PQueue | null = null;

function FakePQueue(options) {
return new (pQueueOverride || PQueue)(options);
}

FakePQueue.default = FakePQueue;

class FakeTransaction {
options;
constructor(options?) {
this.options = options;
}
async begin(): Promise<void> {}
}

const fakeStackTrace = extend({}, stackTrace);

function noop() {}

describe('MultiplexedSession', () => {
let multiplexedSession: mux.MultiplexedSession;
// tslint:disable-next-line variable-name
let MultiplexedSession: typeof mux.MultiplexedSession;
let inventory;

const DATABASE = {
Database: noop,
databaseRole: 'parent_role',
} as unknown as Database;

const sandbox = sinon.createSandbox();
const shouldNotBeCalled = sandbox.stub().throws('Should not be called.');

const createSession = (name = 'id', props?): Session => {
props = props || {};

return Object.assign(new Session(DATABASE, name), props, {
create: sandbox.stub().resolves(),
transaction: sandbox.stub().returns(new FakeTransaction()),
});
};

before(() => {
MultiplexedSession = proxyquire('../src/multiplexed-session.js', {
'stack-trace': fakeStackTrace,
}).MultiplexedSession;
});

beforeEach(() => {
DATABASE.session = createSession;
multiplexedSession = new MultiplexedSession(DATABASE);
inventory = multiplexedSession._multiplexedInventory;
});

afterEach(() => {
sandbox.restore();
});

describe('getSession', () => {
it('should acquire a session', done => {
const fakeSession = createSession();

sandbox.stub(multiplexedSession, '_acquire').resolves(fakeSession);

multiplexedSession.getSession((err, session) => {
assert.ifError(err);
assert.strictEqual(session, fakeSession);
done();
});
});
it('should pass any errors to the callback', done => {
const error = new Error('err');
sandbox.stub(multiplexedSession, '_acquire').rejects(error);
multiplexedSession.getSession(err => {
assert.strictEqual(err, error);
done();
});
});
it('should pass back the session and txn', done => {
const fakeTxn = new FakeTransaction() as unknown as Transaction;
const fakeSession = createSession();

fakeSession.txn = fakeTxn;

sandbox.stub(multiplexedSession, '_acquire').resolves(fakeSession);

multiplexedSession.getSession((err, session, txn) => {
assert.ifError(err);
assert.strictEqual(session, fakeSession);
assert.strictEqual(txn, fakeTxn);
done();
});
});
});
});

0 comments on commit d2f7502

Please sign in to comment.