Skip to content

Commit

Permalink
observability: add SessionPool span annotations (googleapis#2101)
Browse files Browse the repository at this point in the history
This change adds span annotations on the active span which could either be the noopSpan or the top-level active one, for whenever we progressively add more spans.

Built from googleapis#2087
Updates googleapis#2079
  • Loading branch information
odeke-em authored Sep 13, 2024
1 parent 84b99ef commit d8ab278
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 12 deletions.
12 changes: 2 additions & 10 deletions observability-test/observability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,8 @@ describe('startTrace', () => {
startTrace('mySpan', {}, span => {
span.end();

const spans = globalExporter.getFinishedSpans();
assert.strictEqual(
spans.length,
1,
'Exactly 1 span must have been exported'
);
const span0 = spans[0];

assert.equal(
span0.name,
span.name,
SPAN_NAMESPACE_PREFIX + '.mySpan',
'name mismatch'
);
Expand Down Expand Up @@ -317,7 +309,7 @@ describe('getActiveOrNoopSpan', () => {
assert.strictEqual(
span.name,
activeSpan.name,
'names must match between activeSpan or current one'
`names must match between activeSpan or current one\n\tGot: ${span.name}\n\tWant: ${activeSpan.name}`
);
assert.strictEqual(
span.startTime,
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
"@google-cloud/promisify": "^4.0.0",
"@grpc/proto-loader": "^0.7.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/context-async-hooks": "^1.25.1",
"@opentelemetry/semantic-conventions": "^1.25.1",
"@types/big.js": "^6.0.0",
"@types/stack-trace": "0.0.33",
Expand All @@ -86,6 +85,9 @@
"through2": "^4.0.0"
},
"devDependencies": {
"@opentelemetry/context-async-hooks": "^1.25.1",
"@opentelemetry/sdk-trace-base": "^1.26.0",
"@opentelemetry/sdk-trace-node": "^1.26.0",
"@types/concat-stream": "^2.0.0",
"@types/extend": "^3.0.0",
"@types/is": "0.0.25",
Expand Down Expand Up @@ -116,7 +118,6 @@
"mv": "^2.1.1",
"ncp": "^2.0.0",
"nise": "6.0.0",
"@opentelemetry/sdk-trace-node": "^1.25.1",
"p-limit": "^3.0.1",
"path-to-regexp": "6.2.2",
"proxyquire": "^2.0.1",
Expand Down
64 changes: 64 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import Policy = google.iam.v1.Policy;
import FieldMask = google.protobuf.FieldMask;
import IDatabase = google.spanner.admin.database.v1.IDatabase;
import snakeCase = require('lodash.snakecase');
import {getActiveOrNoopSpan} from './instrument';

export type GetDatabaseRolesCallback = RequestCallback<
IDatabaseRole,
Expand Down Expand Up @@ -824,10 +825,17 @@ class Database extends common.GrpcServiceObject {
const transaction = this.batchTransaction({session: session!}, options);
this._releaseOnEnd(session!, transaction);
transaction.begin((err, resp) => {
const span = getActiveOrNoopSpan();
if (err) {
if (isSessionNotFoundError(err)) {
span.addEvent('No session available', {
'session.id': session?.id,
});
}
callback!(err, null, resp!);
return;
}
span.addEvent('Using Session', {'session.id': session?.id});
callback!(null, transaction, resp!);
});
});
Expand Down Expand Up @@ -2031,12 +2039,17 @@ class Database extends common.GrpcServiceObject {
const snapshot = session!.snapshot(options, this.queryOptions_);

snapshot.begin(err => {
const span = getActiveOrNoopSpan();
if (err) {
if (isSessionNotFoundError(err)) {
span.addEvent('No session available', {
'session.id': session?.id,
});
session!.lastError = err;
this.pool_.release(session!);
this.getSnapshot(options, callback!);
} else {
span.addEvent('Using Session', {'session.id': session?.id});
this.pool_.release(session!);
callback!(err);
}
Expand Down Expand Up @@ -2121,7 +2134,14 @@ class Database extends common.GrpcServiceObject {
transaction!.excludeTxnFromChangeStreams();
}
if (!err) {
const span = getActiveOrNoopSpan();
span.addEvent('Using Session', {'session.id': session?.id});
this._releaseOnEnd(session!, transaction!);
} else if (isSessionNotFoundError(err as grpc.ServiceError)) {
const span = getActiveOrNoopSpan();
span.addEvent('No session available', {
'session.id': session?.id,
});
}
cb!(err as grpc.ServiceError | null, transaction);
});
Expand Down Expand Up @@ -2348,6 +2368,8 @@ class Database extends common.GrpcServiceObject {
callback!(err as ServiceError, null);
return;
}
const span = getActiveOrNoopSpan();
span.addEvent('Using Session', {'session.id': session?.id});
config.reqOpts.session = session!.formattedName_;
this.request<Session>(config, (err, ...args) => {
pool.release(session!);
Expand Down Expand Up @@ -2389,10 +2411,17 @@ class Database extends common.GrpcServiceObject {
}
waitForSessionStream.on('reading', () => {
pool.getSession((err, session_) => {
const span = getActiveOrNoopSpan();
if (err) {
if (isSessionNotFoundError(err as grpc.ServiceError)) {
span.addEvent('No session available', {
'session.id': session?.id,
});
}
destroyStream(err as ServiceError);
return;
}
span.addEvent('Using Session', {'session.id': session_?.id});
session = session_!;
config.reqOpts.session = session!.formattedName_;
requestStream = self.requestStream(config);
Expand Down Expand Up @@ -2924,13 +2953,17 @@ class Database extends common.GrpcServiceObject {
options?: TimestampBounds
): PartialResultStream {
const proxyStream: Transform = through.obj();
const span = getActiveOrNoopSpan();

this.pool_.getSession((err, session) => {
if (err) {
proxyStream.destroy(err);
return;
}

const span = getActiveOrNoopSpan();
span.addEvent('Using Session', {'session.id': session?.id});

const snapshot = session!.snapshot(options, this.queryOptions_);

this._releaseOnEnd(session!, snapshot);
Expand All @@ -2951,6 +2984,9 @@ class Database extends common.GrpcServiceObject {
if (session) {
session.lastError = err as grpc.ServiceError;
}
span.addEvent('No session available', {
'session.id': session?.id,
});
// Remove the current data stream from the end user stream.
dataStream.unpipe(proxyStream);
dataStream.removeListener('end', endListener);
Expand Down Expand Up @@ -3081,7 +3117,11 @@ class Database extends common.GrpcServiceObject {
: {};

this.pool_.getSession((err, session?, transaction?) => {
const span = getActiveOrNoopSpan();
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
span.addEvent('No session available', {
'session.id': session?.id,
});
this.runTransaction(options, runFn!);
return;
}
Expand All @@ -3105,10 +3145,17 @@ class Database extends common.GrpcServiceObject {
);

runner.run().then(release, err => {
const span = getActiveOrNoopSpan();
if (isSessionNotFoundError(err)) {
span.addEvent('No session available', {
'session.id': session?.id,
});
release();
this.runTransaction(options, runFn!);
} else {
if (!err) {
span.addEvent('Using Session', {'session.id': session!.id});
}
setImmediate(runFn!, err);
release();
}
Expand Down Expand Up @@ -3195,7 +3242,9 @@ class Database extends common.GrpcServiceObject {
? (optionsOrRunFn as RunTransactionOptions)
: {};

let sessionId = '';
const getSession = this.pool_.getSession.bind(this.pool_);
const span = getActiveOrNoopSpan();
// Loop to retry 'Session not found' errors.
// (and yes, we like while (true) more than for (;;) here)
// eslint-disable-next-line no-constant-condition
Expand All @@ -3212,6 +3261,8 @@ class Database extends common.GrpcServiceObject {
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
sessionId = session?.id;
span.addEvent('Using Session', {'session.id': sessionId});
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
Expand All @@ -3226,6 +3277,9 @@ class Database extends common.GrpcServiceObject {
}
} catch (e) {
if (!isSessionNotFoundError(e as ServiceError)) {
span.addEvent('No session available', {
'session.id': sessionId,
});
throw e;
}
}
Expand Down Expand Up @@ -3301,6 +3355,10 @@ class Database extends common.GrpcServiceObject {
proxyStream.destroy(err);
return;
}

const span = getActiveOrNoopSpan();
span.addEvent('Using Session', {'session.id': session?.id});

const gaxOpts = extend(true, {}, options?.gaxOptions);
const reqOpts = Object.assign(
{} as spannerClient.spanner.v1.BatchWriteRequest,
Expand Down Expand Up @@ -3332,6 +3390,7 @@ class Database extends common.GrpcServiceObject {
if (session) {
session.lastError = err as grpc.ServiceError;
}
span.addEvent('No session available', {'session.id': session?.id});
// Remove the current data stream from the end user stream.
dataStream.unpipe(proxyStream);
dataStream.end();
Expand Down Expand Up @@ -3424,14 +3483,19 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as CallOptions)
: {};
this.pool_.getSession((err, session?, transaction?) => {
const span = getActiveOrNoopSpan();
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
span.addEvent('No session available', {
'session.id': session?.id,
});
this.writeAtLeastOnce(mutations, options, cb!);
return;
}
if (err) {
cb!(err as grpc.ServiceError);
return;
}
span.addEvent('Using Session', {'session.id': session?.id});
this._releaseOnEnd(session!, transaction!);
transaction?.setQueuedMutations(mutations.proto());
return transaction?.commit(options, cb!);
Expand Down
37 changes: 37 additions & 0 deletions src/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {Transaction} from './transaction';
import {NormalCallback} from './common';
import {GoogleError, grpc, ServiceError} from 'google-gax';
import trace = require('stack-trace');
import {getActiveOrNoopSpan} from './instrument';

/**
* @callback SessionPoolCloseCallback
Expand Down Expand Up @@ -630,7 +631,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
* @returns {Promise<Session>}
*/
async _acquire(): Promise<Session> {
const span = getActiveOrNoopSpan();
if (!this.isOpen) {
span.addEvent('SessionPool is closed');
throw new GoogleError(errors.Closed);
}

Expand All @@ -642,18 +645,30 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
// wrapping this logic in a function to call recursively if the session
// we end up with is already dead
const getSession = async (): Promise<Session> => {
span.addEvent('Acquiring session');
const elapsed = Date.now() - startTime;

if (elapsed >= timeout!) {
span.addEvent('Could not acquire session due to an exceeded timeout');
throw new GoogleError(errors.Timeout);
}

const session = await this._getSession(startTime);

if (this._isValidSession(session)) {
span.addEvent('Acquired session', {
'time.elapsed': Date.now() - startTime,
'session.id': session.id.toString(),
});
return session;
}

span.addEvent(
'Could not acquire session because it was invalid. Retrying',
{
'session.id': session.id.toString(),
}
);
this._inventory.borrowed.delete(session);
return getSession();
};
Expand Down Expand Up @@ -723,6 +738,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
* @emits SessionPool#createError
*/
async _createSessions(amount: number): Promise<void> {
const span = getActiveOrNoopSpan();
span.addEvent(`Requesting ${amount} sessions`);

const labels = this.options.labels!;
const databaseRole = this.options.databaseRole!;

Expand All @@ -731,11 +749,16 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
}
this._pending += amount;

let nReturned = 0;
const nRequested: number = amount;

// while we can request as many sessions be created as we want, the backend
// will return at most 100 at a time, hence the need for a while loop.
while (amount > 0) {
let sessions: Session[] | null = null;

span.addEvent(`Creating ${amount} sessions`);

try {
[sessions] = await this.database.batchCreateSessions({
count: amount,
Expand All @@ -744,9 +767,13 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
});

amount -= sessions.length;
nReturned += sessions.length;
} catch (e) {
this._pending -= amount;
this.emit('createError', e);
span.addEvent(
`Requested for ${nRequested} sessions returned ${nReturned}`
);
throw e;
}

Expand All @@ -758,6 +785,8 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
});
});
}

span.addEvent(`Requested for ${nRequested} sessions returned ${nReturned}`);
}

/**
Expand Down Expand Up @@ -865,17 +894,21 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
* @returns {Promise<Session>}
*/
async _getSession(startTime: number): Promise<Session> {
const span = getActiveOrNoopSpan();
if (this._hasSessionUsableFor()) {
span.addEvent('Cache hit: has usable session');
return this._borrowNextAvailableSession();
}
if (this.isFull && this.options.fail!) {
span.addEvent('Session pool is full and failFast=true');
throw new SessionPoolExhaustedError(this._getLeaks());
}

let removeOnceCloseListener: Function;
let removeListener: Function;

// Wait for a session to become available.
span.addEvent('Waiting for a session to become available');
const availableEvent = 'session-available';
const promises = [
new Promise((_, reject) => {
Expand Down Expand Up @@ -990,6 +1023,10 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
* @returns {Promise}
*/
async _ping(session: Session): Promise<void> {
// NOTE: Please do not trace Ping as it gets quite spammy
// with many root spans polluting the main span.
// Please see https://github.com/googleapis/google-cloud-go/issues/1691

this._borrow(session);

if (!this._isValidSession(session)) {
Expand Down
Loading

0 comments on commit d8ab278

Please sign in to comment.