Skip to content

Commit

Permalink
fix(NODE-5751): RTTPinger always sends legacy hello (#3923)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Nov 16, 2023
1 parent c698918 commit bc3d020
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 38 deletions.
44 changes: 23 additions & 21 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const kConnection = Symbol('connection');
/** @internal */
const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRTTPinger = Symbol('rttPinger');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');

const STATE_IDLE = 'idle';
Expand Down Expand Up @@ -81,7 +79,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
[kCancellationToken]: CancellationToken;
/** @internal */
[kMonitorId]?: MonitorInterval;
[kRTTPinger]?: RTTPinger;
rttPinger?: RTTPinger;

get connection(): Connection | undefined {
return this[kConnection];
Expand Down Expand Up @@ -197,8 +195,8 @@ function resetMonitorState(monitor: Monitor) {
monitor[kMonitorId]?.stop();
monitor[kMonitorId] = undefined;

monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

monitor[kCancellationToken].emit('cancel');

Expand Down Expand Up @@ -251,8 +249,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}
: { socketTimeoutMS: connectTimeoutMS };

if (isAwaitable && monitor[kRTTPinger] == null) {
monitor[kRTTPinger] = new RTTPinger(
if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(
monitor[kCancellationToken],
Object.assign(
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
Expand All @@ -271,9 +269,10 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

const rttPinger = monitor[kRTTPinger];
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
: calculateDurationInMs(start);

monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
Expand All @@ -289,8 +288,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);
start = now();
} else {
monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

callback(undefined, hello);
}
Expand Down Expand Up @@ -383,7 +382,7 @@ export interface RTTPingerOptions extends ConnectionOptions {
/** @internal */
export class RTTPinger {
/** @internal */
[kConnection]?: Connection;
connection?: Connection;
/** @internal */
[kCancellationToken]: CancellationToken;
/** @internal */
Expand All @@ -393,7 +392,7 @@ export class RTTPinger {
closed: boolean;

constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
this[kConnection] = undefined;
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this.closed = false;
Expand All @@ -410,8 +409,8 @@ export class RTTPinger {
this.closed = true;
clearTimeout(this[kMonitorId]);

this[kConnection]?.destroy({ force: true });
this[kConnection] = undefined;
this.connection?.destroy({ force: true });
this.connection = undefined;
}
}

Expand All @@ -430,8 +429,8 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

if (rttPinger[kConnection] == null) {
rttPinger[kConnection] = conn;
if (rttPinger.connection == null) {
rttPinger.connection = conn;
}

rttPinger[kRoundTripTime] = calculateDurationInMs(start);
Expand All @@ -441,11 +440,11 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
);
}

const connection = rttPinger[kConnection];
const connection = rttPinger.connection;
if (connection == null) {
connect(options, (err, conn) => {
if (err) {
rttPinger[kConnection] = undefined;
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
Expand All @@ -456,9 +455,12 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

connection.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
const commandName =
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined, err => {
if (err) {
rttPinger[kConnection] = undefined;
rttPinger.connection?.destroy({ force: true });
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
Expand Down
24 changes: 10 additions & 14 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ const stateTransition = makeStateMachine({
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
});

/** @internal */
const kMonitor = Symbol('monitor');

/** @public */
export type ServerOptions = Omit<ConnectionPoolOptions, 'id' | 'generation' | 'hostAddress'> &
MonitorOptions;
Expand Down Expand Up @@ -118,7 +115,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
s: ServerPrivate;
serverApi?: ServerApi;
hello?: Document;
[kMonitor]: Monitor | null;
monitor: Monitor | null;

/** @event */
static readonly SERVER_HEARTBEAT_STARTED = SERVER_HEARTBEAT_STARTED;
Expand Down Expand Up @@ -164,22 +161,21 @@ export class Server extends TypedEventEmitter<ServerEvents> {
});

if (this.loadBalanced) {
this[kMonitor] = null;
this.monitor = null;
// monitoring is disabled in load balancing mode
return;
}

// create the monitor
// TODO(NODE-4144): Remove new variable for type narrowing
const monitor = new Monitor(this, this.s.options);
this[kMonitor] = monitor;
this.monitor = new Monitor(this, this.s.options);

for (const event of HEARTBEAT_EVENTS) {
monitor.on(event, (e: any) => this.emit(event, e));
this.monitor.on(event, (e: any) => this.emit(event, e));
}

monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
this.monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.emit(
Server.DESCRIPTION_RECEIVED,
new ServerDescription(this.description.hostAddress, event.reply, {
Expand Down Expand Up @@ -235,7 +231,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// a load balancer. It never transitions out of this state and
// has no monitor.
if (!this.loadBalanced) {
this[kMonitor]?.connect();
this.monitor?.connect();
} else {
stateTransition(this, STATE_CONNECTED);
this.emit(Server.CONNECT, this);
Expand All @@ -258,7 +254,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
stateTransition(this, STATE_CLOSING);

if (!this.loadBalanced) {
this[kMonitor]?.close();
this.monitor?.close();
}

this.s.pool.close(options, err => {
Expand All @@ -276,7 +272,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
*/
requestCheck(): void {
if (!this.loadBalanced) {
this[kMonitor]?.requestCheck();
this.monitor?.requestCheck();
}
}

Expand Down Expand Up @@ -437,7 +433,7 @@ function markServerUnknown(server: Server, error?: MongoServerError) {
}

if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) {
server[kMonitor]?.reset();
server.monitor?.reset();
}

server.emit(
Expand Down
175 changes: 175 additions & 0 deletions test/integration/connection-monitoring-and-pooling/rtt_pinger.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { expect } from 'chai';
import * as semver from 'semver';
import * as sinon from 'sinon';

import { type Connection, type MongoClient, type RTTPinger } from '../../../src';
import { LEGACY_HELLO_COMMAND } from '../../../src/constants';
import { sleep } from '../../tools/utils';

/**
* RTTPingers are only created after getting a hello from the server that defines topologyVersion
* Each monitor is reaching out to a different node and rttPinger's are created async as a result.
*
* This function checks for rttPingers and sleeps if none are found.
*/
async function getRTTPingers(client: MongoClient) {
type RTTPingerConnection = Omit<RTTPinger, 'connection'> & { connection: Connection };
const pingers = (rtt => rtt?.connection != null) as (r?: RTTPinger) => r is RTTPingerConnection;

if (!client.topology) expect.fail('Must provide a connected client');

// eslint-disable-next-line no-constant-condition
while (true) {
const servers = client.topology.s.servers.values();
const rttPingers = Array.from(servers, s => s.monitor?.rttPinger).filter(pingers);

if (rttPingers.length !== 0) {
return rttPingers;
}

await sleep(5);
}
}

describe('class RTTPinger', () => {
afterEach(() => sinon.restore());

beforeEach(async function () {
if (!this.currentTest) return;
if (this.configuration.isLoadBalanced) {
this.currentTest.skipReason = 'No monitoring in LB mode, test not relevant';
return this.skip();
}
if (semver.gte('4.4.0', this.configuration.version)) {
this.currentTest.skipReason =
'Test requires streaming monitoring, needs to be on MongoDB 4.4+';
return this.skip();
}
});

context('when serverApi is enabled', () => {
let serverApiClient: MongoClient;

beforeEach(async function () {
if (!this.currentTest) return;

if (semver.gte('5.0.0', this.configuration.version)) {
this.currentTest.skipReason = 'Test requires serverApi, needs to be on MongoDB 5.0+';
return this.skip();
}

serverApiClient = this.configuration.newClient(
{},
{ serverApi: { version: '1', strict: true }, heartbeatFrequencyMS: 10 }
);
});

afterEach(async () => {
await serverApiClient?.close();
});

it('measures rtt with a hello command', async function () {
await serverApiClient.connect();
const rttPingers = await getRTTPingers(serverApiClient);

const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.calledWith(sinon.match.any, { hello: 1 }, sinon.match.any);
}
});
});

context('when serverApi is disabled', () => {
let client: MongoClient;

beforeEach(async function () {
if (!this.currentTest) return;
if (this.configuration.serverApi) {
this.currentTest.skipReason = 'Test requires serverApi to NOT be enabled';
return this.skip();
}

client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 });
});

afterEach(async () => {
await client?.close();
});

context('connected to a pre-hello server', () => {
it('measures rtt with a LEGACY_HELLO_COMMAND command', async function () {
await client.connect();
const rttPingers = await getRTTPingers(client);

// Fake pre-hello server.
// Hello was back-ported to feature versions of the server so we would need to pin
// versions prior to 4.4.2, 4.2.10, 4.0.21, and 3.6.21 to integration test
for (const rtt of rttPingers) rtt.connection.helloOk = false;

const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.calledWith(
sinon.match.any,
{ [LEGACY_HELLO_COMMAND]: 1 },
sinon.match.any
);
}
});
});

context('connected to a helloOk server', () => {
it('measures rtt with a hello command', async function () {
await client.connect();
const rttPingers = await getRTTPingers(client);

const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));

// We should always be connected to helloOk servers
for (const rtt of rttPingers) expect(rtt.connection).to.have.property('helloOk', true);

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.calledWith(sinon.match.any, { hello: 1 }, sinon.match.any);
}
});
});
});

context(`when the RTTPinger's hello command receives any error`, () => {
let client: MongoClient;
beforeEach(async function () {
client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 });
});

afterEach(async () => {
await client?.close();
});

it('destroys the connection with force=true', async function () {
await client.connect();
const rttPingers = await getRTTPingers(client);

for (const rtt of rttPingers) {
sinon.stub(rtt.connection, 'command').yieldsRight(new Error('any error'));
}
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'destroy'));

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.calledWithExactly({ force: true });
}
});
});
});
Loading

0 comments on commit bc3d020

Please sign in to comment.