diff --git a/observability-test/database.ts b/observability-test/database.ts index cbcc73572..5f9665a1e 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -375,6 +375,115 @@ describe('Database', () => { }); }); + describe('batchCreateSessions', () => { + it('without error', done => { + const ARGS = [null, [{}]]; + database.request = (config, callback) => { + callback(...ARGS); + }; + + database.batchCreateSessions(10, (err, sessions) => { + assert.ifError(err); + assert.ok(sessions); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with error', done => { + const ARGS = [new Error('batchCreateSessions.error'), null]; + database.request = (config, callback) => { + callback(...ARGS); + }; + + database.batchCreateSessions(10, (err, sessions) => { + assert.ok(err); + assert.ok(!sessions); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'batchCreateSessions.error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + describe('getSnapshot', () => { let fakePool: FakeSessionPool; let fakeSession: FakeSession; diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index bf3d93538..196938d81 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -62,6 +62,9 @@ interface setupResults { spannerMock: mock.MockSpanner; } +const selectSql = 'SELECT 1'; +const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + async function setup( observabilityOptions?: typeof ObservabilityOptions ): Promise { @@ -85,8 +88,6 @@ async function setup( ); }); - const selectSql = 'SELECT 1'; - const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; spannerMock.putStatementResult( selectSql, mock.StatementResult.resultSet(createSelect1ResultSet()) @@ -112,65 +113,54 @@ async function setup( } describe('EndToEnd', () => { - describe('Database', () => { - let server: grpc.Server; - let spanner: Spanner; - let database: Database; - let spannerMock: mock.MockSpanner; - let traceExporter: typeof InMemorySpanExporter; + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + + const contextManager = new AsyncHooksContextManager(); + setGlobalContextManager(contextManager); - const contextManager = new AsyncHooksContextManager(); - setGlobalContextManager(contextManager); + afterEach(() => { + disableContextAndManager(contextManager); + }); - afterEach(() => { - disableContextAndManager(contextManager); + beforeEach(async () => { + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - beforeEach(async () => { - traceExporter = new InMemorySpanExporter(); - const sampler = new AlwaysOnSampler(); - const provider = new NodeTracerProvider({ - sampler: sampler, - exporter: traceExporter, - }); - const setupResult = await setup({ - tracerProvider: provider, - enableExtendedTracing: false, - }); - spanner = setupResult.spanner; - server = setupResult.server; - spannerMock = setupResult.spannerMock; - - const selectSql = 'SELECT 1'; - const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; - spannerMock.putStatementResult( - selectSql, - mock.StatementResult.resultSet(createSelect1ResultSet()) - ); - spannerMock.putStatementResult( - updateSql, - mock.StatementResult.updateCount(1) - ); + const setupResult = await setup({ + tracerProvider: provider, + enableExtendedTracing: false, + }); - provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; - const instance = spanner.instance('instance'); - database = instance.database('database'); - }); + const instance = spanner.instance('instance'); + database = instance.database('database'); + }); - afterEach(() => { - traceExporter.reset(); - spannerMock.resetRequests(); - spanner.close(); - server.tryShutdown(() => {}); - }); + afterEach(() => { + traceExporter.reset(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + describe('Database', () => { it('getSessions', async () => { const [rows] = await database.getSessions(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -205,7 +195,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -216,14 +205,27 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.getSnapshot']; + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Database.getSnapshot', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = []; + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Begin Transaction', + 'Transaction Creation Done', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -242,7 +244,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -253,14 +254,26 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.getTransaction']; + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Database.getTransaction', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -279,7 +292,6 @@ describe('EndToEnd', () => { .on('end', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -290,14 +302,26 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.runStream']; + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Database.runStream', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -313,7 +337,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); // Sort the spans by duration. spans.sort((spanA, spanB) => { @@ -330,6 +353,8 @@ describe('EndToEnd', () => { }); const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; @@ -359,7 +384,15 @@ describe('EndToEnd', () => { 'Expected that RunStream has a defined spanId' ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -375,7 +408,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -386,14 +418,25 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Database.runTransaction', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = []; + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -413,7 +456,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -424,14 +466,71 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Database.writeAtLeastOnce', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + 'Starting Commit', + 'Commit Done', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('batchCreateSessions', done => { + const blankMutations = new MutationSet(); + database.batchCreateSessions(5, (err, sessions) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Database.batchCreateSessions', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -445,46 +544,32 @@ describe('EndToEnd', () => { }); describe('ObservabilityOptions injection and propagation', async () => { - const globalTraceExporter = new InMemorySpanExporter(); - const globalTracerProvider = new NodeTracerProvider({ - sampler: new AlwaysOnSampler(), - exporter: globalTraceExporter, - }); - globalTracerProvider.addSpanProcessor( - new SimpleSpanProcessor(globalTraceExporter) - ); - globalTracerProvider.register(); + it('Passed into Spanner, Instance and Database', async () => { + const traceExporter = new InMemorySpanExporter(); + const tracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - const injectedTraceExporter = new InMemorySpanExporter(); - const injectedTracerProvider = new NodeTracerProvider({ - sampler: new AlwaysOnSampler(), - exporter: injectedTraceExporter, - }); - injectedTracerProvider.addSpanProcessor( - new SimpleSpanProcessor(injectedTraceExporter) - ); + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: tracerProvider, + enableExtendedTracing: true, + }; - const observabilityOptions: typeof ObservabilityOptions = { - tracerProvider: injectedTracerProvider, - enableExtendedTracing: true, - }; - - const setupResult = await setup(observabilityOptions); - const spanner = setupResult.spanner; - const server = setupResult.server; - const spannerMock = setupResult.spannerMock; - - after(async () => { - globalTraceExporter.reset(); - injectedTraceExporter.reset(); - await globalTracerProvider.shutdown(); - await injectedTracerProvider.shutdown(); - spannerMock.resetRequests(); - spanner.close(); - server.tryShutdown(() => {}); - }); + const setupResult = await setup(observabilityOptions); + const spanner = setupResult.spanner; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + + after(async () => { + traceExporter.reset(); + await tracerProvider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); - it('Passed into Spanner, Instance and Database', done => { // Ensure that the same observability configuration is set on the Spanner client. assert.deepStrictEqual(spanner._observabilityOptions, observabilityOptions); @@ -518,11 +603,45 @@ describe('ObservabilityOptions injection and propagation', async () => { databaseByConstructor._observabilityOptions, observabilityOptions ); - - done(); }); - it('Propagates spans to the injected not global TracerProvider', done => { + it('Propagates spans to the injected not global TracerProvider', async () => { + const globalTraceExporter = new InMemorySpanExporter(); + const globalTracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: globalTraceExporter, + }); + globalTracerProvider.addSpanProcessor( + new SimpleSpanProcessor(globalTraceExporter) + ); + globalTracerProvider.register(); + + const injectedTraceExporter = new InMemorySpanExporter(); + const injectedTracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: injectedTraceExporter, + }); + injectedTracerProvider.addSpanProcessor( + new SimpleSpanProcessor(injectedTraceExporter) + ); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: injectedTracerProvider, + enableExtendedTracing: true, + }; + const setupResult = await setup(observabilityOptions); + const spanner = setupResult.spanner; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + + after(async () => { + injectedTraceExporter.reset(); + await injectedTracerProvider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + const instance = spanner.instance('instance'); const database = instance.database('database'); @@ -558,6 +677,8 @@ describe('ObservabilityOptions injection and propagation', async () => { }); const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; @@ -568,6 +689,9 @@ describe('ObservabilityOptions injection and propagation', async () => { ); const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', @@ -578,8 +702,6 @@ describe('ObservabilityOptions injection and propagation', async () => { expectedEventNames, `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` ); - - done(); }); }); }); diff --git a/src/database.ts b/src/database.ts index 917bcabcf..168ff34f4 100644 --- a/src/database.ts +++ b/src/database.ts @@ -450,6 +450,11 @@ class Database extends common.GrpcServiceObject { typeof poolOptions === 'function' ? new (poolOptions as SessionPoolConstructor)(this, null) : new SessionPool(this, poolOptions); + const sessionPoolInstance = this.pool_ as SessionPool; + if (sessionPoolInstance) { + sessionPoolInstance._observabilityOptions = + instance._observabilityOptions; + } if (typeof poolOptions === 'object') { this.databaseRole = poolOptions.databaseRole || null; } @@ -677,30 +682,36 @@ class Database extends common.GrpcServiceObject { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'batchCreateSessions', - reqOpts, - gaxOpts: options.gaxOptions, - headers: headers, - }, - (err, resp) => { - if (err) { - callback!(err, null, resp!); - return; - } + const q = {opts: this._observabilityOptions}; + startTrace('Database.batchCreateSessions', q, span => { + this.request( + { + client: 'SpannerClient', + method: 'batchCreateSessions', + reqOpts, + gaxOpts: options.gaxOptions, + headers: headers, + }, + (err, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, null, resp!); + return; + } - const sessions = (resp!.session || []).map(metadata => { - const session = this.session(metadata.name!); - session._observabilityOptions = this._observabilityOptions; - session.metadata = metadata; - return session; - }); + const sessions = (resp!.session || []).map(metadata => { + const session = this.session(metadata.name!); + session._observabilityOptions = this._observabilityOptions; + session.metadata = metadata; + return session; + }); - callback!(null, sessions, resp!); - } - ); + span.end(); + callback!(null, sessions, resp!); + } + ); + }); } /** @@ -2177,6 +2188,7 @@ class Database extends common.GrpcServiceObject { if (!err) { span.addEvent('Using Session', {'session.id': session?.id}); + transaction!._observabilityOptions = this._observabilityOptions; this._releaseOnEnd(session!, transaction!, span); } else if (isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { @@ -3206,6 +3218,8 @@ class Database extends common.GrpcServiceObject { runFn!(err as grpc.ServiceError); return; } + + transaction!._observabilityOptions = this._observabilityOptions; if (options.optimisticLock) { transaction!.useOptimisticLock(); } diff --git a/src/instance.ts b/src/instance.ts index b72f24622..4986e3ecd 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -1363,6 +1363,7 @@ class Instance extends common.GrpcServiceObject { databases = rowDatabases.map(database => { const databaseInstance = self.database(database.name!, {min: 0}); databaseInstance.metadata = database; + databaseInstance._observabilityOptions = this._observabilityOptions; return databaseInstance; }); } diff --git a/src/instrument.ts b/src/instrument.ts index 6cad7bc4a..b0080601a 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -26,8 +26,10 @@ import { import { Span, SpanStatusCode, + context, trace, INVALID_SPAN_CONTEXT, + ROOT_CONTEXT, SpanAttributes, TimeInput, TracerProvider, @@ -93,6 +95,32 @@ interface traceConfig { const SPAN_NAMESPACE_PREFIX = 'CloudSpanner'; // TODO: discuss & standardize this prefix. export {SPAN_NAMESPACE_PREFIX, traceConfig}; +const { + AsyncHooksContextManager, +} = require('@opentelemetry/context-async-hooks'); + +/* + * This function ensures that async/await works correctly by + * checking if context.active() returns an invalid/unset context + * and if so, sets a global AsyncHooksContextManager otherwise + * spans resulting from async/await invocations won't be correctly + * associated in their respective hierarchies. + */ +function ensureInitialContextManagerSet() { + if (context.active() === ROOT_CONTEXT) { + // If no active context was set previously, trace context propagation cannot + // function correctly with async/await for OpenTelemetry and they acknowledge + // this fact per https://opentelemetry.io/docs/languages/js/context/#active-context + // but we shouldn't make our customers have to invasively edit their code + // nor should they be burdened about these facts, their code should JUST work. + // Please see https://github.com/googleapis/nodejs-spanner/issues/2146 + context.disable(); // Firstly disable any prior contextManager. + const contextManager = new AsyncHooksContextManager(); + contextManager.enable(); + context.setGlobalContextManager(contextManager); + } +} + /** * startTrace begins an active span in the current active context * and passes it back to the set callback function. Each span will @@ -111,6 +139,8 @@ export function startTrace( config = {} as traceConfig; } + ensureInitialContextManagerSet(); + return getTracer(config.opts?.tracerProvider).startActiveSpan( SPAN_NAMESPACE_PREFIX + '.' + spanNameSuffix, {kind: SpanKind.CLIENT}, diff --git a/src/session-pool.ts b/src/session-pool.ts index 09300ecde..47ce3c3a6 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -24,7 +24,12 @@ import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {GoogleError, grpc, ServiceError} from 'google-gax'; import trace = require('stack-trace'); -import {getActiveOrNoopSpan} from './instrument'; +import { + ObservabilityOptions, + getActiveOrNoopSpan, + setSpanErrorAndException, + startTrace, +} from './instrument'; /** * @callback SessionPoolCloseCallback @@ -353,6 +358,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { _pingHandle!: NodeJS.Timer; _requests: PQueue; _traces: Map; + _observabilityOptions?: ObservabilityOptions; /** * Formats stack trace objects into Node-like stack trace. @@ -485,6 +491,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { }); this._traces = new Map(); + if (!this._observabilityOptions) { + this._observabilityOptions = database._observabilityOptions; + } } /** @@ -738,55 +747,65 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @emits SessionPool#createError */ async _createSessions(amount: number): Promise { - const span = getActiveOrNoopSpan(); - span.addEvent(`Requesting ${amount} sessions`); + const q = {opts: this._observabilityOptions}; + return startTrace('SessionPool.createSessions', q, async span => { + span.addEvent(`Requesting ${amount} sessions`); - const labels = this.options.labels!; - const databaseRole = this.options.databaseRole!; + const labels = this.options.labels!; + const databaseRole = this.options.databaseRole!; - if (amount <= 0) { - return; - } - this._pending += amount; + if (amount <= 0) { + return; + } + this._pending += amount; - let nReturned = 0; - const nRequested: number = amount; + let nReturned = 0; + const nRequested: number = amount; - // while we can request as many sessions be created as we want, the backend - // will return at most 100 at a time, hence the need for a while loop. - while (amount > 0) { - let sessions: Session[] | null = null; + if (!this.database._observabilityOptions) { + this.database._observabilityOptions = this._observabilityOptions; + } - span.addEvent(`Creating ${amount} sessions`); + // while we can request as many sessions be created as we want, the backend + // will return at most 100 at a time, hence the need for a while loop. + while (amount > 0) { + let sessions: Session[] | null = null; + + span.addEvent(`Creating ${amount} sessions`); + + try { + [sessions] = await this.database.batchCreateSessions({ + count: amount, + labels: labels, + databaseRole: databaseRole, + }); + + amount -= sessions.length; + nReturned += sessions.length; + } catch (e) { + this._pending -= amount; + this.emit('createError', e); + span.addEvent( + `Requested for ${nRequested} sessions returned ${nReturned}` + ); + setSpanErrorAndException(span, e as Error); + throw e; + } - try { - [sessions] = await this.database.batchCreateSessions({ - count: amount, - labels: labels, - databaseRole: databaseRole, + sessions.forEach((session: Session) => { + setImmediate(() => { + this._inventory.borrowed.add(session); + this._pending -= 1; + this.release(session); + }); }); - - amount -= sessions.length; - nReturned += sessions.length; - } catch (e) { - this._pending -= amount; - this.emit('createError', e); - span.addEvent( - `Requested for ${nRequested} sessions returned ${nReturned}` - ); - throw e; } - sessions.forEach((session: Session) => { - setImmediate(() => { - this._inventory.borrowed.add(session); - this._pending -= 1; - this.release(session); - }); - }); - } - - span.addEvent(`Requested for ${nRequested} sessions returned ${nReturned}`); + span.addEvent( + `Requested for ${nRequested} sessions returned ${nReturned}` + ); + span.end(); + }); } /** diff --git a/test/spanner.ts b/test/spanner.ts index fc4e11b91..f26d4828d 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -4999,7 +4999,7 @@ describe('Spanner with mock server', () => { // and tests the database/instance suffix is an iteration of // each afresh invocation of newTestDatabase, which has been // causing test flakes. - it('Check for span annotations', () => { + it('Check for span annotations', done => { const exporter = new InMemorySpanExporter(); const provider = new NodeTracerProvider({ sampler: new AlwaysOnSampler(), @@ -5014,44 +5014,67 @@ describe('Spanner with mock server', () => { const opts: typeof ObservabilityOptions = {tracerProvider: provider}; startTrace('aSpan', {opts: opts}, span => { + instance._observabilityOptions = opts; const database = newTestDatabase(); database._observabilityOptions = opts; - async function runIt() { - const query = { - sql: 'SELECT 1', - }; + const query = { + sql: 'SELECT 1', + }; - const [rows] = await database.run(query); + database.run(query, (err, rows) => { assert.strictEqual(rows.length, 1); - } - runIt(); + span.end(); - span.end(); + exporter.forceFlush(); + const spans = exporter.getFinishedSpans(); - const spans = exporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span'); - const span0 = spans[0]; - const events = span0.events; + // Sort the spans by startTime. + spans.sort((spanA, spanB) => { + spanA.startTime < spanB.startTime; + }); - // Sort the events by earliest time of occurence. - events.sort((evtA, evtB) => { - return evtA.time < evtB.time; - }); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); - const gotEventNames: string[] = []; - events.forEach(event => { - gotEventNames.push(event.name); - }); + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + 'CloudSpanner.aSpan', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); - const wantEventNames = ['Requesting 25 sessions', 'Creating 25 sessions']; + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + + assert.deepEqual( + actualEventNames, + expectedEventNames, + `Mismatched events\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); - assert.deepEqual( - gotEventNames, - wantEventNames, - `Mismatched events\n\tGot: ${gotEventNames}\n\tWant: ${wantEventNames}` - ); + done(); + }); }); }); });