diff --git a/observability-test/batch-transaction.ts b/observability-test/batch-transaction.ts index 13ce59c5e..763fb7e36 100644 --- a/observability-test/batch-transaction.ts +++ b/observability-test/batch-transaction.ts @@ -139,7 +139,7 @@ describe('BatchTransaction', () => { batchTransaction = new BatchTransaction(SESSION as {} as Session); batchTransaction.session = SESSION as {} as Session; batchTransaction.id = ID; - batchTransaction.observabilityOptions = {tracerProvider: provider}; + batchTransaction._observabilityOptions = {tracerProvider: provider}; REQUEST.callsFake((_, callback) => callback(null, RESPONSE)); }); diff --git a/observability-test/database.ts b/observability-test/database.ts index 8329e81eb..cbcc73572 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -241,7 +241,7 @@ describe('Database', () => { database = new Database(INSTANCE, NAME, POOL_OPTIONS); database.parent = INSTANCE; database.databaseRole = 'parent_role'; - database.observabilityConfig = { + database._observabilityOptions = { tracerProvider: provider, enableExtendedTracing: false, }; diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 933e9bf08..bf3d93538 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -17,7 +17,7 @@ import * as assert from 'assert'; import {grpc} from 'google-gax'; import {google} from '../protos/protos'; -import {Database, Spanner} from '../src'; +import {Database, Instance, Spanner} from '../src'; import {MutationSet} from '../src/transaction'; import protobuf = google.spanner.v1; import * as mock from '../test/mockserver/mockspanner'; @@ -35,6 +35,8 @@ const { AsyncHooksContextManager, } = require('@opentelemetry/context-async-hooks'); +const {ObservabilityOptions} = require('../src/instrument'); + /** A simple result set for SELECT 1. */ function createSelect1ResultSet(): protobuf.ResultSet { const fields = [ @@ -60,7 +62,9 @@ interface setupResults { spannerMock: mock.MockSpanner; } -async function setup(): Promise { +async function setup( + observabilityOptions?: typeof ObservabilityOptions +): Promise { const server = new grpc.Server(); const spannerMock = mock.createMockSpanner(server); @@ -97,6 +101,7 @@ async function setup(): Promise { servicePath: 'localhost', port, sslCreds: grpc.credentials.createInsecure(), + observabilityOptions: observabilityOptions, }); return Promise.resolve({ @@ -122,7 +127,16 @@ describe('EndToEnd', () => { }); beforeEach(async () => { - const setupResult = await setup(); + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + const setupResult = await setup({ + tracerProvider: provider, + enableExtendedTracing: false, + }); spanner = setupResult.spanner; server = setupResult.server; spannerMock = setupResult.spannerMock; @@ -138,21 +152,10 @@ describe('EndToEnd', () => { mock.StatementResult.updateCount(1) ); - traceExporter = new InMemorySpanExporter(); - const sampler = new AlwaysOnSampler(); - - const provider = new NodeTracerProvider({ - sampler: sampler, - exporter: traceExporter, - }); provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); const instance = spanner.instance('instance'); database = instance.database('database'); - database.observabilityConfig = { - tracerProvider: provider, - enableExtendedTracing: false, - }; }); afterEach(() => { @@ -440,3 +443,143 @@ describe('EndToEnd', () => { }); }); }); + +describe('ObservabilityOptions injection and propagation', async () => { + const globalTraceExporter = new InMemorySpanExporter(); + const globalTracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: globalTraceExporter, + }); + globalTracerProvider.addSpanProcessor( + new SimpleSpanProcessor(globalTraceExporter) + ); + globalTracerProvider.register(); + + const injectedTraceExporter = new InMemorySpanExporter(); + const injectedTracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: injectedTraceExporter, + }); + injectedTracerProvider.addSpanProcessor( + new SimpleSpanProcessor(injectedTraceExporter) + ); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: injectedTracerProvider, + enableExtendedTracing: true, + }; + + const setupResult = await setup(observabilityOptions); + const spanner = setupResult.spanner; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + + after(async () => { + globalTraceExporter.reset(); + injectedTraceExporter.reset(); + await globalTracerProvider.shutdown(); + await injectedTracerProvider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + + it('Passed into Spanner, Instance and Database', done => { + // Ensure that the same observability configuration is set on the Spanner client. + assert.deepStrictEqual(spanner._observabilityOptions, observabilityOptions); + + // Acquire a handle to the Instance through spanner.instance. + const instanceByHandle = spanner.instance('instance'); + assert.deepStrictEqual( + instanceByHandle._observabilityOptions, + observabilityOptions + ); + + // Create the Instance by means of a constructor directly. + const instanceByConstructor = new Instance(spanner, 'myInstance'); + assert.deepStrictEqual( + instanceByConstructor._observabilityOptions, + observabilityOptions + ); + + // Acquire a handle to the Database through instance.database. + const databaseByHandle = instanceByHandle.database('database'); + assert.deepStrictEqual( + databaseByHandle._observabilityOptions, + observabilityOptions + ); + + // Create the Database by means of a constructor directly. + const databaseByConstructor = new Database( + instanceByConstructor, + 'myDatabase' + ); + assert.deepStrictEqual( + databaseByConstructor._observabilityOptions, + observabilityOptions + ); + + done(); + }); + + it('Propagates spans to the injected not global TracerProvider', done => { + const instance = spanner.instance('instance'); + const database = instance.database('database'); + + database.run('SELECT 1', (err, rows) => { + assert.ifError(err); + + injectedTraceExporter.forceFlush(); + globalTraceExporter.forceFlush(); + const spansFromInjected = injectedTraceExporter.getFinishedSpans(); + const spansFromGlobal = globalTraceExporter.getFinishedSpans(); + + assert.strictEqual( + spansFromGlobal.length, + 0, + 'Expecting no spans from the global exporter' + ); + assert.strictEqual( + spansFromInjected.length > 0, + true, + 'Expecting spans from the injected exporter' + ); + + spansFromInjected.sort((spanA, spanB) => { + spanA.startTime < spanB.startTime; + }); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spansFromInjected.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); +}); diff --git a/observability-test/table.ts b/observability-test/table.ts index 00071510c..86f6145f9 100644 --- a/observability-test/table.ts +++ b/observability-test/table.ts @@ -92,7 +92,7 @@ describe('Table', () => { extend(Table, TableCached); table = new Table(DATABASE, NAME); transaction = new FakeTransaction(); - table.observabilityOptions = {tracerProvider: provider}; + table._observabilityOptions = {tracerProvider: provider}; }); afterEach(() => { diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index cea784b03..403f7dd6e 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -139,7 +139,7 @@ class BatchTransaction extends Snapshot { const traceConfig: traceConfig = { sql: query, - opts: this.observabilityOptions, + opts: this._observabilityOptions, }; return startTrace( 'BatchTransaction.createQueryPartitions', @@ -182,7 +182,7 @@ class BatchTransaction extends Snapshot { */ createPartitions_(config, callback) { const traceConfig: traceConfig = { - opts: this.observabilityOptions, + opts: this._observabilityOptions, }; return startTrace( @@ -259,7 +259,7 @@ class BatchTransaction extends Snapshot { */ createReadPartitions(options, callback) { const traceConfig: traceConfig = { - opts: this.observabilityOptions, + opts: this._observabilityOptions, }; return startTrace( diff --git a/src/database.ts b/src/database.ts index a805d7a84..917bcabcf 100644 --- a/src/database.ts +++ b/src/database.ts @@ -344,7 +344,7 @@ class Database extends common.GrpcServiceObject { databaseDialect?: EnumKey< typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect > | null; - observabilityConfig: ObservabilityOptions | undefined; + _observabilityOptions?: ObservabilityOptions; constructor( instance: Instance, name: string, @@ -467,7 +467,7 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); - this.observabilityConfig = instance.observabilityConfig; + this._observabilityOptions = instance._observabilityOptions; } /** * @typedef {array} SetDatabaseMetadataResponse @@ -693,7 +693,7 @@ class Database extends common.GrpcServiceObject { const sessions = (resp!.session || []).map(metadata => { const session = this.session(metadata.name!); - session.observabilityConfig = this.observabilityConfig; + session._observabilityOptions = this._observabilityOptions; session.metadata = metadata; return session; }); @@ -738,6 +738,7 @@ class Database extends common.GrpcServiceObject { const id = identifier.transaction; const transaction = new BatchTransaction(session, options); transaction.id = id; + transaction._observabilityOptions = this._observabilityOptions; transaction.readTimestamp = identifier.timestamp as PreciseDate; return transaction; } @@ -827,7 +828,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this._observabilityOptions}; return startTrace('Database.createBatchTransaction', q, span => { this.pool_.getSession((err, session) => { if (err) { @@ -1085,6 +1086,7 @@ class Database extends common.GrpcServiceObject { /CREATE TABLE `*([^\s`(]+)/ )![1]; const table = this.table(tableName!); + table._observabilityOptions = this._observabilityOptions; callback!(null, table, operation!, resp!); }); } @@ -1873,7 +1875,7 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } - const q = {opts: this.observabilityConfig}; + const q = {opts: this._observabilityOptions}; return startTrace('Database.getSessions', q, span => { this.request< google.spanner.v1.ISession, @@ -1895,7 +1897,7 @@ class Database extends common.GrpcServiceObject { sessionInstances = sessions.map(metadata => { const session = self.session(metadata.name!); session.metadata = metadata; - session.observabilityConfig = this.observabilityConfig; + session._observabilityOptions = this._observabilityOptions; return session; }); } @@ -2056,7 +2058,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this._observabilityOptions}; return startTrace('Database.getSnapshot', q, span => { this.pool_.getSession((err, session) => { if (err) { @@ -2157,7 +2159,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as GetTransactionOptions) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this._observabilityOptions}; return startTrace('Database.getTransaction', q, span => { this.pool_.getSession((err, session, transaction) => { if (options.requestOptions) { @@ -2784,7 +2786,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {sql: query, opts: this.observabilityConfig}; + const q = {sql: query, opts: this._observabilityOptions}; return startTrace('Database.run', q, span => { this.runStream(query, options) .on('error', err => { @@ -3005,7 +3007,7 @@ class Database extends common.GrpcServiceObject { options?: TimestampBounds ): PartialResultStream { const proxyStream: Transform = through.obj(); - const q = {sql: query, opts: this.observabilityConfig}; + const q = {sql: query, opts: this._observabilityOptions}; return startTrace('Database.runStream', q, span => { this.pool_.getSession((err, session) => { if (err) { @@ -3183,7 +3185,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this._observabilityOptions}; startTrace('Database.runTransaction', q, span => { this.pool_.getSession((err, session?, transaction?) => { if (err) { @@ -3576,7 +3578,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as CallOptions) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this._observabilityOptions}; return startTrace('Database.writeAtLeastOnce', q, span => { this.pool_.getSession((err, session?, transaction?) => { if (err && isSessionNotFoundError(err as grpc.ServiceError)) { diff --git a/src/index.ts b/src/index.ts index f6eedf081..db3110568 100644 --- a/src/index.ts +++ b/src/index.ts @@ -80,6 +80,7 @@ import { import grpcGcpModule = require('grpc-gcp'); const grpcGcp = grpcGcpModule(grpc); import * as v1 from './v1'; +import {ObservabilityOptions} from './instrument'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); @@ -130,6 +131,7 @@ export type GetInstanceConfigOperationsCallback = PagedCallback< * @property {google.spanner.v1.IDirectedReadOptions} [directedReadOptions] Sets the DirectedReadOptions for all ReadRequests and ExecuteSqlRequests for the Client. * Indicates which replicas or regions should be used for non-transactional reads or queries. * DirectedReadOptions won't be set for readWrite transactions" + * @property {ObservabilityOptions} [observabilityOptions] Sets the observability options to be used for OpenTelemetry tracing */ export interface SpannerOptions extends GrpcClientOptions { apiEndpoint?: string; @@ -138,6 +140,7 @@ export interface SpannerOptions extends GrpcClientOptions { sslCreds?: grpc.ChannelCredentials; routeToLeaderEnabled?: boolean; directedReadOptions?: google.spanner.v1.IDirectedReadOptions | null; + observabilityOptions?: ObservabilityOptions; } export interface RequestConfig { client: string; @@ -239,6 +242,7 @@ class Spanner extends GrpcService { resourceHeader_: {[k: string]: string}; routeToLeaderEnabled = true; directedReadOptions: google.spanner.v1.IDirectedReadOptions | null; + _observabilityOptions: ObservabilityOptions | undefined; /** * Placeholder used to auto populate a column with the commit timestamp. @@ -365,6 +369,7 @@ class Spanner extends GrpcService { [CLOUD_RESOURCE_HEADER]: this.projectFormattedName_, }; this.directedReadOptions = directedReadOptions; + this._observabilityOptions = options.observabilityOptions; } /** @@ -585,6 +590,7 @@ class Spanner extends GrpcService { return; } const instance = this.instance(formattedName); + instance._observabilityOptions = this._observabilityOptions; callback!(null, instance, operation, resp); } ); @@ -2052,3 +2058,4 @@ import IInstanceConfig = instanceAdmin.spanner.admin.instance.v1.IInstanceConfig export {v1, protos}; export default {Spanner}; export {Float32, Float, Int, Struct, Numeric, PGNumeric, SpannerDate}; +export {ObservabilityOptions}; diff --git a/src/instance.ts b/src/instance.ts index 00b584e42..b72f24622 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -51,6 +51,7 @@ import {google as instanceAdmin} from '../protos/protos'; import {google as databaseAdmin} from '../protos/protos'; import {google as spannerClient} from '../protos/protos'; import {CreateInstanceRequest} from './index'; +import {ObservabilityOptions} from './instrument'; export type IBackup = databaseAdmin.spanner.admin.database.v1.IBackup; export type IDatabase = databaseAdmin.spanner.admin.database.v1.IDatabase; @@ -164,6 +165,7 @@ class Instance extends common.GrpcServiceObject { databases_: Map; metadata?: IInstance; resourceHeader_: {[k: string]: string}; + _observabilityOptions?: ObservabilityOptions; constructor(spanner: Spanner, name: string) { const formattedName_ = Instance.formatName_(spanner.projectId, name); const methods = { @@ -239,6 +241,7 @@ class Instance extends common.GrpcServiceObject { this.resourceHeader_ = { [CLOUD_RESOURCE_HEADER]: this.formattedName_, }; + this._observabilityOptions = spanner._observabilityOptions; } /** @@ -925,6 +928,7 @@ class Instance extends common.GrpcServiceObject { return; } const database = this.database(name, poolOptions || poolCtor); + database._observabilityOptions = this._observabilityOptions; callback(null, database, operation, resp); } ); @@ -973,10 +977,9 @@ class Instance extends common.GrpcServiceObject { } const key = name.split('/').pop() + optionsKey; if (!this.databases_.has(key!)) { - this.databases_.set( - key!, - new Database(this, name, poolOptions, queryOptions) - ); + const db = new Database(this, name, poolOptions, queryOptions); + db._observabilityOptions = this._observabilityOptions; + this.databases_.set(key!, db); } return this.databases_.get(key!)!; } diff --git a/src/instrument.ts b/src/instrument.ts index f0676ea46..6cad7bc4a 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -45,12 +45,21 @@ interface SQLStatement { sql: string; } -interface observabilityOptions { +/* + * ObservabilityOptions defines the configuration to be used for Spanner OpenTelemetry Traces. + * @property [tracerProvider] Sets the TracerProvider to be used for traces, + * Global TracerProvider will be used as a fallback. + * @property [enableExtendedTracing] Sets whether to enable extended OpenTelemetry tracing. Enabling this option will add the + * following additional attributes to the traces that are generated by the client + * db.statement: Contains the SQL statement that is being executed. + * Alternatively, you could set environment variable `SPANNER_ENABLE_EXTENDED_TRACING=true`. + */ +interface ObservabilityOptions { tracerProvider: TracerProvider; enableExtendedTracing?: boolean; } -export type {observabilityOptions as ObservabilityOptions}; +export type {ObservabilityOptions}; export type {Span}; const TRACER_NAME = 'cloud.google.com/nodejs/spanner'; @@ -77,7 +86,7 @@ interface traceConfig { sql?: string | SQLStatement; tableName?: string; dbName?: string; - opts?: observabilityOptions; + opts?: ObservabilityOptions; that?: Object; } diff --git a/src/table.ts b/src/table.ts index 87e07db89..a435b7a40 100644 --- a/src/table.ts +++ b/src/table.ts @@ -99,7 +99,7 @@ const POSTGRESQL = 'POSTGRESQL'; class Table { database: Database; name: string; - observabilityOptions?: ObservabilityOptions; + _observabilityOptions?: ObservabilityOptions; constructor(database: Database, name: string) { /** * The {@link Database} instance of this {@link Table} instance. @@ -113,6 +113,7 @@ class Table { * @type {string} */ this.name = name; + this._observabilityOptions = database._observabilityOptions; } /** * Create a table. @@ -1080,7 +1081,7 @@ class Table { callback: CommitCallback ): void { const traceConfig: traceConfig = { - opts: this.observabilityOptions, + opts: this._observabilityOptions, }; startTrace('Table.' + method, traceConfig, span => { diff --git a/src/transaction.ts b/src/transaction.ts index 1a375ff0f..ca96864e1 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -286,7 +286,7 @@ export class Snapshot extends EventEmitter { queryOptions?: IQueryOptions; resourceHeader_: {[k: string]: string}; requestOptions?: Pick; - observabilityOptions?: ObservabilityOptions; + _observabilityOptions?: ObservabilityOptions; /** * The transaction ID. diff --git a/test/spanner.ts b/test/spanner.ts index cdb41b689..fc4e11b91 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -5015,7 +5015,7 @@ describe('Spanner with mock server', () => { const opts: typeof ObservabilityOptions = {tracerProvider: provider}; startTrace('aSpan', {opts: opts}, span => { const database = newTestDatabase(); - database.observabilityConfig = opts; + database._observabilityOptions = opts; async function runIt() { const query = {