From e848ff112ae159a432f6ba31006a34baba38947f Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 25 Jun 2024 17:41:50 +0300 Subject: [PATCH] feat: instrument src/ with traces using OpenTelemetry This change implements tracing of RPC calls using OpenTelemetry to aid in providing observability. Fixes #2079 --- README.md | 97 +++ observability-test/grpc-instrumentation.ts | 103 +++ observability-test/observability.ts | 307 ++++++++ package.json | 14 +- samples/observability-traces.js | 464 ++++++++++++ samples/package.json | 20 +- src/batch-transaction.ts | 144 ++-- src/database.ts | 505 ++++++------ src/index.ts | 616 ++++++++------- src/instance.ts | 507 +++++++------ src/instrument.ts | 249 ++++++ src/session-pool.ts | 76 +- src/session.ts | 146 ++-- src/table.ts | 46 +- src/transaction.ts | 842 +++++++++++---------- test/session-pool.ts | 93 +++ tsconfig.json | 1 + 17 files changed, 2994 insertions(+), 1236 deletions(-) create mode 100644 observability-test/grpc-instrumentation.ts create mode 100644 observability-test/observability.ts create mode 100644 samples/observability-traces.js create mode 100644 src/instrument.ts diff --git a/README.md b/README.md index 8e88ef2e6..7cb06e8e5 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ Google APIs Client Libraries, in [Client Libraries Explained][explained]. * [Before you begin](#before-you-begin) * [Installing the client library](#installing-the-client-library) * [Using the client library](#using-the-client-library) +* [Observability](#observability) * [Samples](#samples) * [Versioning](#versioning) * [Contributing](#contributing) @@ -82,6 +83,101 @@ rows.forEach(row => console.log(row)); ``` +## Observability + +This package has been instrumented with [OpenTelemetry](https://opentelemetry.io/docs/languages/js/) for tracing. Make sure to firstly import and enable +OpenTelemetry before importing this Spanner library. + +Please use a tracer named "nodejs-spanner". + +> :warning: **Make sure that the OpenTelemetry imports are the first, before importing the Spanner library** + +> :warning: **In order for your spans to be annotated with SQL, you MUST opt-in by setting environment variable +`SPANNER_NODEJS_ANNOTATE_PII_SQL=1`, this is because SQL statements can be +sensitive personally-identifiable-information (PII).** + +To test out trace examination, you can use Google Cloud Trace like this. + +```javascript +function exportSpans(instanceId, databaseId, projectId) { + // Firstly initiate OpenTelemetry + const {Resource} = require('@opentelemetry/resources'); + const {NodeSDK} = require('@opentelemetry/sdk-node'); + const {trace} = require('@opentelemetry/api'); + const { + NodeTracerProvider, + TraceIdRatioBasedSampler, + } = require('@opentelemetry/sdk-trace-node'); + const {BatchSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + const { + SEMRESATTRS_SERVICE_NAME, + SEMRESATTRS_SERVICE_VERSION, + } = require('@opentelemetry/semantic-conventions'); + + const resource = Resource.default().merge( + new Resource({ + [SEMRESATTRS_SERVICE_NAME]: 'spanner-sample', + [SEMRESATTRS_SERVICE_VERSION]: 'v1.0.0', // Whatever version of your app is running., + }) + ); + + const {TraceExporter} = require('@google-cloud/opentelemetry-cloud-trace-exporter'); + const exporter = new TraceExporter({}); + + const sdk = new NodeSDK({ + resource: resource, + traceExporter: exporter, + // Trace every single request to ensure that we generate + // enough traffic for proper examination of traces. + sampler: new TraceIdRatioBasedSampler(1.0), + }); + sdk.start(); + + const provider = new NodeTracerProvider({resource: resource}); + provider.addSpanProcessor(new BatchSpanProcessor(exporter)); + provider.register(); + + // OpenTelemetry MUST be imported much earlier than the cloud-spanner package. + const tracer = trace.getTracer('nodejs-spanner'); + + const {Spanner} = require('@google-cloud/spanner'); + + tracer.startActiveSpan('deleteAndCreateDatabase', span => { + // Creates a client + const spanner = new Spanner({ + projectId: projectId, + }); + + // Gets a reference to a Cloud Spanner instance and database + const instance = spanner.instance(instanceId); + const database = instance.database(databaseId); + const databaseAdminClient = spanner.getDatabaseAdminClient(); + + const databasePath = databaseAdminClient.databasePath( + projectId, + instanceId, + databaseId + ); + + deleteDatabase(databaseAdminClient, databasePath, () => { + createDatabase( + databaseAdminClient, + projectId, + instanceId, + databaseId, + () => { + span.end(); + console.log('main span.end'); + setTimeout(() => { + console.log('finished delete and creation of the database'); + }, 5000); + } + ); + }); + }); +} +``` + ## Samples @@ -90,6 +186,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-spanner/tre | Sample | Source Code | Try it | | --------------------------- | --------------------------------- | ------ | | Add and drop new database role | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/add-and-drop-new-database-role.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/add-and-drop-new-database-role.js,samples/README.md) | +| Export traces & observability from this library | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/observability-traces.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/observability-traces.js,samples/README.md) | | Backups-cancel | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-cancel.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-cancel.js,samples/README.md) | | Copies a source backup | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-copy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-copy.js,samples/README.md) | | Backups-create-with-encryption-key | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-create-with-encryption-key.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-create-with-encryption-key.js,samples/README.md) | diff --git a/observability-test/grpc-instrumentation.ts b/observability-test/grpc-instrumentation.ts new file mode 100644 index 000000000..6c2bc7f5f --- /dev/null +++ b/observability-test/grpc-instrumentation.ts @@ -0,0 +1,103 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +describe('Enabled gRPC instrumentation with sampling on', () => { + const assert = require('assert'); + const {registerInstrumentations} = require('@opentelemetry/instrumentation'); + const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + const { + InMemorySpanExporter, + NodeTracerProvider, + } = require('@opentelemetry/sdk-trace-node'); + const {GrpcInstrumentation} = require('@opentelemetry/instrumentation-grpc'); + const done = registerInstrumentations({ + instrumentations: [new GrpcInstrumentation()], + }); + + const projectId = process.env.SPANNER_TEST_PROJECTID || 'test-project'; + const {Spanner} = require('../src'); + const spanner = new Spanner({ + projectId: projectId, + }); + const instance = spanner.instance('test-instance'); + const database = instance.database('test-db'); + + const exporter = new InMemorySpanExporter(); + const provider = new NodeTracerProvider(); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + provider.register(); + + beforeEach(async () => { + // Mimick usual customer usage in which at setup time, the + // Spanner and Database handles are created once then sit + // and wait until they service HTTP or gRPC calls that + // come in say 5+ seconds after the service is fully started. + // This gives time for the batch session creation to be be completed. + await new Promise((resolve, reject) => setTimeout(resolve, 100)); + + exporter.reset(); + }); + + after(async () => { + database.close(); + spanner.close(); + await provider.shutdown(); + done(); + }); + + it('Invoking database methods creates spans: gRPC enabled', async () => { + const query = {sql: 'SELECT * FROM INFORMATION_SCHEMA.TABLES'}; + const [rows] = await database.run(query); + assert.ok(rows.length > 1); + + // We need to ensure that spans were generated and exported. + const spans = exporter.getFinishedSpans(); + assert.ok(spans.length > 0, 'at least 1 span must have been created'); + + // Sort the spans by duration, in the natural + // trace view order by longer duration first. + spans.sort((spanA, spanB) => { + return spanA.duration > spanB.duration; + }); + + const got: string[] = []; + spans.forEach(span => { + got.push(span.name); + }); + + const want = ['grpc.google.spanner.v1.Spanner/ExecuteStreamingSql']; + + assert.deepEqual( + want, + got, + 'The spans order by duration has been violated:\n\tGot: ' + + got.toString() + + '\n\tWant: ' + + want.toString() + ); + + // Ensure that each span has the attribute + // SEMATTRS_DB_SYSTEM, set to 'spanner' + spans.forEach(span => { + if (span.name.startsWith('cloud.google.com')) { + assert.equal( + span.attributes[SEMATTRS_DB_SYSTEM], + 'spanner', + 'Invalid DB_SYSTEM attribute' + ); + } + }); + }); +}); diff --git a/observability-test/observability.ts b/observability-test/observability.ts new file mode 100644 index 000000000..020547b47 --- /dev/null +++ b/observability-test/observability.ts @@ -0,0 +1,307 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +const assert = require('assert'); + +const { + AlwaysOffSampler, + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +const { + disableContextAndManager, + setGlobalContextManager, + startTrace, + optInForSQLStatementOnSpans, + optOutOfSQLStatementOnSpans, + setTracerProvider, +} = require('../src/instrument'); +const { + SEMATTRS_DB_STATEMENT, + SEMATTRS_DB_SYSTEM, +} = require('@opentelemetry/semantic-conventions'); + +const {ContextManager} = require('@opentelemetry/api'); +const { + AsyncHooksContextManager, +} = require('@opentelemetry/context-async-hooks'); + +const projectId = process.env.SPANNER_TEST_PROJECTID || 'test-project'; + +describe('Testing spans produced with a sampler on', () => { + const exporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + let provider: typeof NodeTracerProvider; + let contextManager: typeof ContextManager; + + const {Spanner} = require('../src'); + const spanner = new Spanner({ + projectId: projectId, + }); + const instance = spanner.instance('test-instance'); + const database = instance.database('test-db'); + + beforeEach(() => { + provider = new NodeTracerProvider({ + sampler: sampler, + exporter: exporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + provider.register(); + setTracerProvider(provider); + + contextManager = new AsyncHooksContextManager(); + setGlobalContextManager(contextManager); + }); + + afterEach(async () => { + disableContextAndManager(contextManager); + exporter.forceFlush(); + exporter.reset(); + }); + + after(async () => { + spanner.close(); + await provider.shutdown(); + }); + + it('Invoking database methods creates spans: no gRPC instrumentation', async () => { + const query = {sql: 'SELECT 1'}; + const [rows] = await database.run(query); + assert.strictEqual(rows.length, 1); + + const spans = exporter.getFinishedSpans(); + // We need to ensure that spans were generated and exported + // correctly. + assert.ok(spans.length > 0, 'at least 1 span must have been created'); + + // Now sort the spans by duration, in reverse magnitude order. + spans.sort((spanA, spanB) => { + return spanA.duration > spanB.duration; + }); + + const got: string[] = []; + spans.forEach(span => { + got.push(span.name); + }); + + const want = [ + 'cloud.google.com/nodejs/spanner/Database.runStream', + 'cloud.google.com/nodejs/spanner/Database.run', + 'cloud.google.com/nodejs/spanner/Transaction.runStream', + ]; + + assert.deepEqual( + want, + got, + 'The spans order by duration has been violated:\n\tGot: ' + + got.toString() + + '\n\tWant: ' + + want.toString() + ); + + // Ensure that each span has the attribute + // SEMATTRS_DB_SYSTEM, set to 'spanner' + spans.forEach(span => { + assert.equal( + span.attributes[SEMATTRS_DB_SYSTEM], + 'spanner', + 'Missing DB_SYSTEM attribute' + ); + }); + }); + + const methodsTakingSQL = { + 'cloud.google.com/nodejs/spanner/Database.run': true, + 'cloud.google.com/nodejs/spanner/Transaction.runStream': true, + }; + + it('Opt-ing into PII-risk SQL annotation on spans works', async () => { + optInForSQLStatementOnSpans(); + + const {Spanner} = require('../src'); + const spanner = new Spanner({ + projectId: projectId, + }); + const instance = spanner.instance('test-instance'); + const database = instance.database('test-db'); + + const query = {sql: 'SELECT CURRENT_TIMESTAMP()'}; + const [rows] = await database.run(query); + assert.strictEqual(rows.length, 1); + + const spans = exporter.getFinishedSpans(); + // We need to ensure that spans were generated and exported + // correctly. + assert.ok(spans.length > 0, 'at least 1 span must have been created'); + + // Ensure that each span has the attribute + // SEMATTRS_DB_SYSTEM, set to 'spanner' + spans.forEach(span => { + if (!methodsTakingSQL[span.name]) { + return; + } + + const got = span.attributes[SEMATTRS_DB_STATEMENT]; + const want = query.sql; + assert.strictEqual( + got, + want, + `${span.name} has Invalid DB_STATEMENT attribute\n\tGot: ${got}\n\tWant: ${want}` + ); + }); + }); + + it('Closing the client creates the closing span', () => { + const {Spanner} = require('../src'); + const spanner = new Spanner({ + projectId: projectId, + }); + spanner.close(); + + const spans = exporter.getFinishedSpans(); + // We need to ensure that spans were generated and exported + // correctly. + assert.ok(spans.length == 1, 'exactly 1 span must have been created'); + assert.strictEqual( + spans[0].name, + 'cloud.google.com/nodejs/spanner/Spanner.close' + ); + }); +}); + +describe('Capturing sessionPool annotations', () => { + const exporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: exporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + provider.register(); + setTracerProvider(provider); + + const contextManager = new AsyncHooksContextManager(); + setGlobalContextManager(contextManager); + + after(async () => { + exporter.forceFlush(); + exporter.reset(); + disableContextAndManager(contextManager); + await provider.shutdown(); + }); + + it('Check for annotations', () => { + const {Spanner} = require('../src'); + const spanner = new Spanner({ + projectId: projectId, + }); + const instance = spanner.instance('test-instance'); + const database = instance.database('test-db'); + + startTrace('observabilityTest', {}, async(udtSpan) => { + const query = {sql: 'SELECT CURRENT_TIMESTAMP()'}; + const [rows] = await database.run(query); + assert.strictEqual(rows.length, 1); + + database.close(); + spanner.close(); + udtSpan.end(); + + exporter.forceFlush(); + + const spans = exporter.getFinishedSpans(); + assert.ok(spans.length >= 1, 'at least 1 span should be exported'); + + spans.forEach(span => { + console.log('eSpan', span.name); + }); + + const span0 = spans[0]; + assert.ok( + span0.events.length > 0, + 'at least one event should have been added' + ); + span0.events.forEach(event => { + console.log('event', event.toString()); + }); + + console.log('done here'); + }); + }); +}); + +describe('Always off sampler used', () => { + const exporter = new InMemorySpanExporter(); + const sampler = new AlwaysOffSampler(); + + let provider: typeof NodeTracerProvider; + let contextManager: typeof ContextManager; + + const {Spanner} = require('../src'); + const spanner = new Spanner({ + projectId: projectId, + }); + const instance = spanner.instance('test-instance'); + const database = instance.database('test-db'); + + beforeEach(() => { + provider = new NodeTracerProvider({ + sampler: sampler, + exporter: exporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + provider.register(); + setTracerProvider(provider); + + contextManager = new AsyncHooksContextManager(); + setGlobalContextManager(contextManager); + }); + + afterEach(async () => { + disableContextAndManager(contextManager); + exporter.forceFlush(); + exporter.reset(); + }); + + after(async () => { + spanner.close(); + await provider.shutdown(); + }); + + it('Querying with gRPC enabled', async () => { + const query = {sql: 'SELECT 1'}; + const [rows] = await database.run(query); + assert.strictEqual(rows.length, 1); + + const spans = exporter.getFinishedSpans(); + assert.strictEqual(spans.length, 0, 'no spans should be exported'); + }); + + it('Opt-ing into PII-risk SQL annotation', async () => { + optInForSQLStatementOnSpans(); + + const query = {sql: 'SELECT CURRENT_TIMESTAMP()'}; + const [rows] = await database.run(query); + assert.strictEqual(rows.length, 1); + + const spans = exporter.getFinishedSpans(); + assert.strictEqual(spans.length, 0, 'no spans must be created'); + }); +}); diff --git a/package.json b/package.json index cbab89390..0783d213b 100644 --- a/package.json +++ b/package.json @@ -33,8 +33,9 @@ "samples-test-with-archived": "cd samples/ && npm link ../ && npm test-with-archived && cd ../", "samples-test": "cd samples/ && npm link ../ && npm test && cd ../", "system-test": "mocha build/system-test --timeout 1600000", + "observability-test": "mocha build/observability-test --timeout 1600000", "cleanup": "mocha scripts/cleanup.js --timeout 30000", - "test": "mocha build/test build/test/common", + "test": "mocha build/test build/test/common build/observability-test", "ycsb": "node ./benchmark/ycsb.js run -P ./benchmark/workloada -p table=usertable -p cloudspanner.instance=ycsb-instance -p operationcount=100 -p cloudspanner.database=ycsb", "fix": "gts fix", "clean": "gts clean", @@ -42,6 +43,7 @@ "prepare": "npm run compile-protos && npm run compile", "pretest": "npm run compile", "presystem-test": "npm run compile", + "preobservability-test": "npm run compile", "proto": "compileProtos src", "docs-test": "linkinator docs", "predocs-test": "npm run docs", @@ -56,7 +58,17 @@ "@google-cloud/precise-date": "^4.0.0", "@google-cloud/projectify": "^4.0.0", "@google-cloud/promisify": "^4.0.0", + "@grpc/grpc-js": "^1.10.11", "@grpc/proto-loader": "^0.7.0", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/instrumentation": "^0.52.1", + "@opentelemetry/instrumentation-grpc": "^0.52.1", + "@opentelemetry/instrumentation-http": "^0.52.1", + "@opentelemetry/resources": "^1.25.1", + "@opentelemetry/sdk-node": "^0.52.1", + "@opentelemetry/sdk-trace-base": "^1.25.1", + "@opentelemetry/sdk-trace-node": "^1.25.1", + "@opentelemetry/semantic-conventions": "^1.25.1", "@types/big.js": "^6.0.0", "@types/stack-trace": "0.0.33", "arrify": "^2.0.0", diff --git a/samples/observability-traces.js b/samples/observability-traces.js new file mode 100644 index 000000000..b66d3b4e8 --- /dev/null +++ b/samples/observability-traces.js @@ -0,0 +1,464 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +async function exportSpans(instanceId, databaseId, projectId) { + // [START spanner_export_traces] + // Imports the Google Cloud client library and OpenTelemetry libraries for exporting traces. + const {Resource} = require('@opentelemetry/resources'); + const {NodeSDK} = require('@opentelemetry/sdk-node'); + const {trace} = require('@opentelemetry/api'); + const { + NodeTracerProvider, + TraceIdRatioBasedSampler, + } = require('@opentelemetry/sdk-trace-node'); + const { + BatchSpanProcessor, + ConsoleSpanExporter, + SimpleSpanProcessor, + } = require('@opentelemetry/sdk-trace-base'); + const { + SEMRESATTRS_SERVICE_NAME, + SEMRESATTRS_SERVICE_VERSION, + } = require('@opentelemetry/semantic-conventions'); + + const resource = Resource.default().merge( + new Resource({ + [SEMRESATTRS_SERVICE_NAME]: 'spanner-sample', + [SEMRESATTRS_SERVICE_VERSION]: 'v1.0.0', // Whatever version of your app is running., + }) + ); + + const options = {serviceName: 'nodejs-spanner'}; + const { + TraceExporter, + } = require('@google-cloud/opentelemetry-cloud-trace-exporter'); + const exporter = new TraceExporter({}); + // const {ZipkinExporter} = require('@opentelemetry/exporter-zipkin'); + // const exporter = new ZipkinExporter({}); + + const {registerInstrumentations} = require('@opentelemetry/instrumentation'); + const {GrpcInstrumentation} = require('@opentelemetry/instrumentation-grpc'); + const {HttpInstrumentation} = require('@opentelemetry/instrumentation-http'); + registerInstrumentations({ + instrumentations: [new GrpcInstrumentation(), new HttpInstrumentation()], + }); + + const sdk = new NodeSDK({ + resource: resource, + traceExporter: exporter, + // Trace every single request to ensure that we generate + // enough traffic for proper examination of traces. + sampler: new TraceIdRatioBasedSampler(1.0), + }); + sdk.start(); + + const provider = new NodeTracerProvider({resource: resource}); + provider.addSpanProcessor(new BatchSpanProcessor(exporter)); + provider.register(); + + const tracer = provider.getTracer('nodejs-spanner'); + + const {Spanner} = require('@google-cloud/spanner'); + + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // const projectId = 'my-project-id'; + // const instanceId = 'my-instance'; + // const databaseId = 'my-database'; + + // Creates a client + const spanner = new Spanner({ + projectId: projectId, + observability: { + tracerProvider: provider, + enableExtendedTracing: true, + }, + }); + + // Gets a reference to a Cloud Spanner instance and database + const instance = spanner.instance(instanceId); + const database = instance.database(databaseId); + const databaseAdminClient = spanner.getDatabaseAdminClient(); + + const databasePath = databaseAdminClient.databasePath( + projectId, + instanceId, + databaseId + ); + + await new Promise((resolve, reject) => setTimeout(resolve, 5000)); + + tracer.startActiveSpan('deleteAndCreateDatabase', span => { + deleteDatabase(databaseAdminClient, databasePath, () => { + createDatabase( + databaseAdminClient, + projectId, + instanceId, + databaseId, + () => { + insertUsingDml(tracer, database, async () => { + console.log('main span.end'); + + try { + const query = { + sql: 'SELECT SingerId, FirstName, LastName FROM Singers', + }; + const [rows] = await database.run(query); + + for (const row of rows) { + const json = row.toJSON(); + + console.log( + `SingerId: ${json.SingerId}, FirstName: ${json.FirstName}, LastName: ${json.LastName}` + ); + } + } catch (err) { + console.error('ERROR:', err); + } + + spanner.close(); + span.end(); + // await new Promise((resolve, reject) => setTimeout(resolve, 600)); + // await exporter.forceFlush(); + await new Promise((resolve, reject) => + setTimeout(() => { + console.log('finished delete and creation of the database'); + }, 10000) + ); + }); + } + ); + }); + }); + + // [END spanner_export_traces] +} + +function quickstart() {} + +function createDropIndices( + tracer, + databaseAdminClient, + database, + databasePath +) { + async function createIndex(tracer, callback) { + const span = tracer.startSpan('createIndex'); + const request = ['CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)']; + + // Creates a new index in the database + try { + const [operation] = await databaseAdminClient.updateDatabaseDdl({ + database: databasePath, + statements: request, + }); + + console.log('Waiting for operation to complete...'); + await operation.promise(); + + console.log('Added the AlbumsByAlbumTitle index.'); + spanner.close(); + span.end(); + } catch (err) { + console.error('ERROR:', err); + dropIndex(() => {}); + } finally { + span.end(); + } + } + + async function dropIndex( + tracer, + databaseAdminClient, + callback, + databasePath + ) { + const span = tracer.startSpan('dropIndex'); + const request = ['DROP INDEX AlbumsByAlbumTitle']; + + // Creates a new index in the database + try { + const [operation] = await databaseAdminClient.updateDatabaseDdl({ + database: databasePath, + statements: request, + }); + + console.log('Waiting for operation to complete...'); + await operation.promise(); + span.end(); + console.log('Added the AlbumsByAlbumTitle index.'); + } catch (err) { + console.error('ERROR:', err); + createIndex(tracer, () => {}); + } finally { + setTimeout(() => { + callback(); + }, 5000); + } + } + + // Gets a transaction object that captures the database state + // at a specific point in time + tracer.startActiveSpan('runOperations', span => { + createIndex(tracer, () => { + setTimeout(() => { + span.end(); + }, 10000); + }); + }); +} + +function runMutations(tracer, database) { + const {MutationGroup} = require('@google-cloud/spanner'); + + // Create Mutation Groups + /** + * Related mutations should be placed in a group, such as insert mutations for both a parent and a child row. + * A group must contain related mutations. + * Please see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.BatchWriteRequest.MutationGroup} + * for more details and examples. + */ + tracer.startActiveSpan('runMutations', span => { + const mutationGroup1 = new MutationGroup(); + mutationGroup1.insert('Singers', { + SingerId: 1, + FirstName: 'Scarlet', + LastName: 'Terry', + }); + + const mutationGroup2 = new MutationGroup(); + mutationGroup2.insert('Singers', { + SingerId: 2, + FirstName: 'Marc', + }); + mutationGroup2.insert('Singers', { + SingerId: 3, + FirstName: 'Catalina', + LastName: 'Smith', + }); + mutationGroup2.insert('Albums', { + AlbumId: 1, + SingerId: 2, + AlbumTitle: 'Total Junk', + }); + mutationGroup2.insert('Albums', { + AlbumId: 2, + SingerId: 3, + AlbumTitle: 'Go, Go, Go', + }); + + const options = { + transactionTag: 'batch-write-tag', + }; + + try { + database + .batchWriteAtLeastOnce([mutationGroup1, mutationGroup2], options) + .on('error', console.error) + .on('data', response => { + // Check the response code of each response to determine whether the mutation group(s) were applied successfully. + if (response.status.code === 0) { + console.log( + `Mutation group indexes ${ + response.indexes + }, have been applied with commit timestamp ${Spanner.timestamp( + response.commitTimestamp + ).toJSON()}` + ); + } + // Mutation groups that fail to commit trigger a response with a non-zero status code. + else { + console.log( + `Mutation group indexes ${response.indexes}, could not be applied with error code ${response.status.code}, and error message ${response.status.message}` + ); + } + }) + .on('end', () => { + console.log('Request completed successfully'); + database.close(); + span.end(); + }); + } catch (err) { + console.log(err); + span.end(); + } finally { + setTimeout(() => { + database.close(); + span.end(); + }, 8000); + } + }); +} + +function insertUsingDml(tracer, database, callback) { + tracer.startActiveSpan('insertUsingDML', span => { + database.runTransaction(async (err, transaction) => { + if (err) { + span.end(); + console.error(err); + return; + } + try { + const [delCount] = await transaction.runUpdate({ + sql: 'DELETE FROM Singers WHERE 1=1', + }); + + console.log(`Deletion count ${delCount}`); + + const [rowCount] = await transaction.runUpdate({ + sql: 'INSERT Singers (SingerId, FirstName, LastName) VALUES (10, @firstName, @lastName)', + params: { + firstName: 'Virginia', + lastName: 'Watson', + }, + }); + + console.log( + `Successfully inserted ${rowCount} record into the Singers table.` + ); + + await transaction.commit(); + } catch (err) { + console.error('ERROR:', err); + } finally { + // Close the database when finished. + console.log('exiting insertUsingDml'); + tracer.startActiveSpan('timingOutToExport-insertUsingDML', eSpan => { + setTimeout(() => { + eSpan.end(); + span.end(); + if (callback) { + callback(); + } + }, 50); + }); + } + }); + }); +} + +function createTableWithForeignKeyDeleteCascade( + databaseAdminClient, + databasePath +) { + const requests = [ + 'DROP TABLE Customers', + 'DROP TABLE ShoppingCarts', + `CREATE TABLE Customers ( + CustomerId INT64, + CustomerName STRING(62) NOT NULL + ) PRIMARY KEY (CustomerId)`, + `CREATE TABLE ShoppingCarts ( + CartId INT64 NOT NULL, + CustomerId INT64 NOT NULL, + CustomerName STRING(62) NOT NULL, + CONSTRAINT FKShoppingCartsCustomerId FOREIGN KEY (CustomerId) + REFERENCES Customers (CustomerId) ON DELETE CASCADE, + ) PRIMARY KEY (CartId)`, + ]; + + async function doDDL() { + const [operation] = await databaseAdminClient.updateDatabaseDdl({ + database: databasePath, + statements: requests, + }); + + console.log( + 'Waiting for createTableWithForeignKeyDeleteCasscae operation...' + ); + await operation.promise(); + } + + doDDL(); +} + +function createDatabase( + databaseAdminClient, + projectId, + instanceId, + databaseId, + callback +) { + async function doCreateDatabase() { + if (databaseId) { + callback(); + return; + } + + // Create the database with default tables. + const createSingersTableStatement = ` + CREATE TABLE Singers ( + SingerId INT64 NOT NULL, + FirstName STRING(1024), + LastName STRING(1024), + SingerInfo BYTES(MAX) + ) PRIMARY KEY (SingerId)`; + const createAlbumsStatement = ` + CREATE TABLE Albums ( + SingerId INT64 NOT NULL, + AlbumId INT64 NOT NULL, + AlbumTitle STRING(MAX) + ) PRIMARY KEY (SingerId, AlbumId), + INTERLEAVE IN PARENT Singers ON DELETE CASCADE`; + + const [operation] = await databaseAdminClient.createDatabase({ + createStatement: 'CREATE DATABASE `' + databaseId + '`', + extraStatements: [createSingersTableStatement, createAlbumsStatement], + parent: databaseAdminClient.instancePath(projectId, instanceId), + }); + + console.log(`Waiting for creation of ${databaseId} to complete...`); + await operation.promise(); + console.log(`Created database ${databaseId}`); + callback(); + } + doCreateDatabase(); +} + +function deleteDatabase(databaseAdminClient, databasePath, callback) { + async function doDropDatabase() { + if (databasePath) { + callback(); + return; + } + + const [operation] = await databaseAdminClient.dropDatabase({ + database: databasePath, + }); + + await operation; + console.log('Finished dropping the database'); + callback(); + } + + doDropDatabase(); +} + +require('yargs') + .demand(1) + .command( + 'export ', + 'Execute a read-only transaction on an example Cloud Spanner table.', + {}, + opts => exportSpans(opts.instanceName, opts.databaseName, opts.projectId) + ) + .example('node $0 export "my-instance" "my-database" "my-project-id"') + .wrap(120) + .recommendCommands() + .epilogue('For more information, see https://cloud.google.com/spanner/docs') + .strict() + .help().argv; diff --git a/samples/package.json b/samples/package.json index d0712247f..3eec28f50 100644 --- a/samples/package.json +++ b/samples/package.json @@ -16,10 +16,26 @@ }, "dependencies": { "@google-cloud/kms": "^4.0.0", + "@google-cloud/opentelemetry-cloud-trace-exporter": "^2.3.0", "@google-cloud/precise-date": "^4.0.0", "@google-cloud/spanner": "^7.12.0", - "yargs": "^17.0.0", - "protobufjs": "^7.0.0" + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/auto-instrumentations-node": "^0.47.1", + "@opentelemetry/exporter-jaeger": "^1.25.1", + "@opentelemetry/exporter-zipkin": "^1.25.1", + "@opentelemetry/instrumentation": "^0.52.1", + "@opentelemetry/instrumentation-grpc": "^0.52.1", + "@opentelemetry/instrumentation-http": "^0.52.1", + "@opentelemetry/propagator-ot-trace": "^0.27.2", + "@opentelemetry/resources": "^1.25.1", + "@opentelemetry/sdk-metrics": "^1.25.1", + "@opentelemetry/sdk-node": "^0.52.1", + "@opentelemetry/sdk-trace-base": "^1.25.1", + "@opentelemetry/sdk-trace-node": "^1.25.1", + "@opentelemetry/sdk-trace-web": "^1.25.1", + "@opentelemetry/semantic-conventions": "^1.25.1", + "protobufjs": "^7.0.0", + "yargs": "^17.0.0" }, "devDependencies": { "chai": "^4.2.0", diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index 842a82cdc..48025267e 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -25,6 +25,7 @@ import { CLOUD_RESOURCE_HEADER, addLeaderAwareRoutingHeader, } from '../src/common'; +import {startTrace, setSpanError} from './instrument'; export interface TransactionIdentifier { session: string | Session; @@ -136,21 +137,30 @@ class BatchTransaction extends Snapshot { delete reqOpts.gaxOptions; delete reqOpts.types; - const headers: {[k: string]: string} = {}; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + return startTrace('BatchTransaction.createQueryPartitions', query, span => { + const headers: {[k: string]: string} = {}; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + + this.createPartitions_( + { + client: 'SpannerClient', + method: 'partitionQuery', + reqOpts, + gaxOpts: query.gaxOptions, + headers: headers, + }, + (err, partitions, resp) => { + if (err) { + setSpanError(span, err); + } - this.createPartitions_( - { - client: 'SpannerClient', - method: 'partitionQuery', - reqOpts, - gaxOpts: query.gaxOptions, - headers: headers, - }, - callback - ); + span.end(); + callback(err, partitions, resp); + } + ); + }); } /** * Generic create partition method. Handles common parameters used in both @@ -163,37 +173,43 @@ class BatchTransaction extends Snapshot { * @param {function} callback Callback function. */ createPartitions_(config, callback) { - const query = extend({}, config.reqOpts, { - session: this.session.formattedName_, - transaction: {id: this.id}, - }); - config.reqOpts = extend({}, query); - config.headers = { - [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, - }; - delete query.partitionOptions; - this.session.request(config, (err, resp) => { - if (err) { - callback(err, null, resp); - return; - } - - const partitions = resp.partitions.map(partition => { - return extend({}, query, partition); + return startTrace('BatchTransaction.createPartitions', {}, span => { + const query = extend({}, config.reqOpts, { + session: this.session.formattedName_, + transaction: {id: this.id}, }); + config.reqOpts = extend({}, query); + config.headers = { + [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database) + .formattedName_, + }; + delete query.partitionOptions; + this.session.request(config, (err, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback(err, null, resp); + return; + } - if (resp.transaction) { - const {id, readTimestamp} = resp.transaction; + const partitions = resp.partitions.map(partition => { + return extend({}, query, partition); + }); - this.id = id; + if (resp.transaction) { + const {id, readTimestamp} = resp.transaction; - if (readTimestamp) { - this.readTimestampProto = readTimestamp; - this.readTimestamp = new PreciseDate(readTimestamp); + this.id = id; + + if (readTimestamp) { + this.readTimestampProto = readTimestamp; + this.readTimestamp = new PreciseDate(readTimestamp); + } } - } - callback(null, partitions, resp); + span.end(); + callback(null, partitions, resp); + }); }); } /** @@ -226,29 +242,38 @@ class BatchTransaction extends Snapshot { * @returns {Promise} */ createReadPartitions(options, callback) { - const reqOpts = Object.assign({}, options, { - keySet: Snapshot.encodeKeySet(options), - }); + return startTrace('BatchTransaction.createReadPartitions', {}, span => { + const reqOpts = Object.assign({}, options, { + keySet: Snapshot.encodeKeySet(options), + }); - delete reqOpts.gaxOptions; - delete reqOpts.keys; - delete reqOpts.ranges; + delete reqOpts.gaxOptions; + delete reqOpts.keys; + delete reqOpts.ranges; - const headers: {[k: string]: string} = {}; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const headers: {[k: string]: string} = {}; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + + this.createPartitions_( + { + client: 'SpannerClient', + method: 'partitionRead', + reqOpts, + gaxOpts: options.gaxOptions, + headers: headers, + }, + (err, partitions, resp) => { + if (err) { + setSpanError(span, err); + } - this.createPartitions_( - { - client: 'SpannerClient', - method: 'partitionRead', - reqOpts, - gaxOpts: options.gaxOptions, - headers: headers, - }, - callback - ); + span.end(); + callback(err, partitions, resp); + } + ); + }); } /** * Executes partition. @@ -322,6 +347,7 @@ class BatchTransaction extends Snapshot { * ``` */ executeStream(partition) { + // TODO: Instrument the streams with Otel. if (is.string(partition.table)) { return this.createReadStream(partition.table, partition); } diff --git a/src/database.ts b/src/database.ts index b3b38a239..7eb9fdfb1 100644 --- a/src/database.ts +++ b/src/database.ts @@ -102,6 +102,7 @@ import Policy = google.iam.v1.Policy; import FieldMask = google.protobuf.FieldMask; import IDatabase = google.spanner.admin.database.v1.IDatabase; import snakeCase = require('lodash.snakecase'); +import {startTrace, setSpanError} from './instrument'; export type GetDatabaseRolesCallback = RequestCallback< IDatabaseRole, @@ -399,6 +400,7 @@ class Database extends common.GrpcServiceObject { options: CreateDatabaseOptions, callback: CreateDatabaseCallback ) => { + // TODO: Instrument this method with OpenTelemetry. const pool = this.pool_ as SessionPool; if (pool._pending > 0) { // If there are BatchCreateSessions requests pending, then we should @@ -648,48 +650,56 @@ class Database extends common.GrpcServiceObject { options: number | BatchCreateSessionsOptions, callback?: BatchCreateSessionsCallback ): void | Promise { - if (typeof options === 'number') { - options = {count: options}; - } + return startTrace('Database.batchCreateSessions', {}, span => { + if (typeof options === 'number') { + options = {count: options}; + } - const count = options.count; - const labels = options.labels || {}; - const databaseRole = options.databaseRole || this.databaseRole || null; + const count = options.count; + span.setAttribute('session.count.requested', count); + const labels = options.labels || {}; + const databaseRole = options.databaseRole || this.databaseRole || null; - const reqOpts: google.spanner.v1.IBatchCreateSessionsRequest = { - database: this.formattedName_, - sessionTemplate: {labels: labels, creatorRole: databaseRole}, - sessionCount: count, - }; + const reqOpts: google.spanner.v1.IBatchCreateSessionsRequest = { + database: this.formattedName_, + sessionTemplate: {labels: labels, creatorRole: databaseRole}, + sessionCount: count, + }; - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'batchCreateSessions', - reqOpts, - gaxOpts: options.gaxOptions, - headers: headers, - }, - (err, resp) => { - if (err) { - callback!(err, null, resp!); - return; - } + this.request( + { + client: 'SpannerClient', + method: 'batchCreateSessions', + reqOpts, + gaxOpts: options.gaxOptions, + headers: headers, + }, + (err, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, null, resp!); + return; + } - const sessions = (resp!.session || []).map(metadata => { - const session = this.session(metadata.name!); - session.metadata = metadata; - return session; - }); + const sessions = (resp!.session || []).map(metadata => { + const session = this.session(metadata.name!); + session.metadata = metadata; + return session; + }); - callback!(null, sessions, resp!); - } - ); + span.setAttribute('session.count.created', sessions.length); + + span.end(); + callback!(null, sessions, resp!); + } + ); + }); } /** @@ -2013,38 +2023,46 @@ class Database extends common.GrpcServiceObject { optionsOrCallback?: TimestampBounds | GetSnapshotCallback, cb?: GetSnapshotCallback ): void | Promise<[Snapshot]> { - const callback = - typeof optionsOrCallback === 'function' - ? (optionsOrCallback as GetSnapshotCallback) - : cb; - const options = - typeof optionsOrCallback === 'object' - ? (optionsOrCallback as TimestampBounds) - : {}; - - this.pool_.getSession((err, session) => { - if (err) { - callback!(err as ServiceError); - return; - } - - const snapshot = session!.snapshot(options, this.queryOptions_); - - snapshot.begin(err => { + return startTrace('Database.getSnapshot', {}, span => { + const callback = + typeof optionsOrCallback === 'function' + ? (optionsOrCallback as GetSnapshotCallback) + : cb; + const options = + typeof optionsOrCallback === 'object' + ? (optionsOrCallback as TimestampBounds) + : {}; + + this.pool_.getSession((err, session) => { if (err) { - if (isSessionNotFoundError(err)) { - session!.lastError = err; - this.pool_.release(session!); - this.getSnapshot(options, callback!); - } else { - this.pool_.release(session!); - callback!(err); - } + setSpanError(span, err); + span.end(); + callback!(err as ServiceError); return; } - this._releaseOnEnd(session!, snapshot); - callback!(err, snapshot); + const snapshot = session!.snapshot(options, this.queryOptions_); + + snapshot.begin(err => { + if (err) { + setSpanError(span, err); + if (isSessionNotFoundError(err)) { + session!.lastError = err; + this.pool_.release(session!); + this.getSnapshot(options, callback!); + span.end(); + } else { + this.pool_.release(session!); + span.end(); + callback!(err); + } + return; + } + + this._releaseOnEnd(session!, snapshot); + span.end(); + callback!(err, snapshot); + }); }); }); } @@ -2698,32 +2716,38 @@ class Database extends common.GrpcServiceObject { optionsOrCallback?: TimestampBounds | RunCallback, cb?: RunCallback ): void | Promise { - let stats: ResultSetStats; - let metadata: ResultSetMetadata; - const rows: Row[] = []; - const callback = - typeof optionsOrCallback === 'function' - ? (optionsOrCallback as RunCallback) - : cb; - const options = - typeof optionsOrCallback === 'object' - ? (optionsOrCallback as TimestampBounds) - : {}; - - this.runStream(query, options) - .on('error', callback!) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - } - }) - .on('stats', _stats => (stats = _stats)) - .on('data', row => { - rows.push(row); - }) - .on('end', () => { - callback!(null, rows, stats, metadata); - }); + startTrace('Database.run', {sql: query}, span => { + let stats: ResultSetStats; + let metadata: ResultSetMetadata; + const rows: Row[] = []; + const callback = + typeof optionsOrCallback === 'function' + ? (optionsOrCallback as RunCallback) + : cb; + const options = + typeof optionsOrCallback === 'object' + ? (optionsOrCallback as TimestampBounds) + : {}; + + this.runStream(query, options) + .on('error', err => { + setSpanError(span, err); + callback!(err as grpc.ServiceError, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + } + }) + .on('stats', _stats => (stats = _stats)) + .on('data', row => { + rows.push(row); + }) + .on('end', () => { + span.end(); + callback!(null, rows, stats, metadata); + }); + }); } /** * Partitioned DML transactions are used to execute DML statements with a @@ -2923,54 +2947,65 @@ class Database extends common.GrpcServiceObject { query: string | ExecuteSqlRequest, options?: TimestampBounds ): PartialResultStream { - const proxyStream: Transform = through.obj(); + return startTrace('Database.runStream', {}, span => { + const proxyStream: Transform = through.obj(); - this.pool_.getSession((err, session) => { - if (err) { - proxyStream.destroy(err); - return; - } + this.pool_.getSession((err, session) => { + if (err) { + setSpanError(span, err); + proxyStream.destroy(err); + span.end(); + return; + } - const snapshot = session!.snapshot(options, this.queryOptions_); - - this._releaseOnEnd(session!, snapshot); - - let dataReceived = false; - let dataStream = snapshot.runStream(query); - const endListener = () => snapshot.end(); - dataStream - .once('data', () => (dataReceived = true)) - .once('error', err => { - if ( - !dataReceived && - isSessionNotFoundError(err as grpc.ServiceError) - ) { - // If it is a 'Session not found' error and we have not yet received - // any data, we can safely retry the query on a new session. - // Register the error on the session so the pool can discard it. - if (session) { - session.lastError = err as grpc.ServiceError; + const snapshot = session!.snapshot(options, this.queryOptions_); + + this._releaseOnEnd(session!, snapshot); + + let dataReceived = false; + let dataStream = snapshot.runStream(query); + const endListener = () => snapshot.end(); + dataStream + .once('data', () => (dataReceived = true)) + .once('error', err => { + setSpanError(span, err); + + if ( + !dataReceived && + isSessionNotFoundError(err as grpc.ServiceError) + ) { + // If it is a 'Session not found' error and we have not yet received + // any data, we can safely retry the query on a new session. + // Register the error on the session so the pool can discard it. + if (session) { + session.lastError = err as grpc.ServiceError; + } + // Remove the current data stream from the end user stream. + dataStream.unpipe(proxyStream); + dataStream.removeListener('end', endListener); + dataStream.end(); + snapshot.end(); + // Create a new data stream and add it to the end user stream. + dataStream = this.runStream(query, options); + dataStream.pipe(proxyStream); + } else { + proxyStream.destroy(err); + snapshot.end(); } - // Remove the current data stream from the end user stream. - dataStream.unpipe(proxyStream); - dataStream.removeListener('end', endListener); - dataStream.end(); - snapshot.end(); - // Create a new data stream and add it to the end user stream. - dataStream = this.runStream(query, options); - dataStream.pipe(proxyStream); - } else { - proxyStream.destroy(err); - snapshot.end(); - } - }) - .on('stats', stats => proxyStream.emit('stats', stats)) - .on('response', response => proxyStream.emit('response', response)) - .once('end', endListener) - .pipe(proxyStream); - }); - return proxyStream as PartialResultStream; + span.end(); + }) + .on('stats', stats => proxyStream.emit('stats', stats)) + .on('response', response => proxyStream.emit('response', response)) + .once('end', () => { + span.end(); + endListener(); + }) + .pipe(proxyStream); + }); + + return proxyStream as PartialResultStream; + }); } /** @@ -3071,47 +3106,65 @@ class Database extends common.GrpcServiceObject { optionsOrRunFn: RunTransactionOptions | RunTransactionCallback, fn?: RunTransactionCallback ): void { - const runFn = - typeof optionsOrRunFn === 'function' - ? (optionsOrRunFn as RunTransactionCallback) - : fn; - const options = - typeof optionsOrRunFn === 'object' && optionsOrRunFn - ? (optionsOrRunFn as RunTransactionOptions) - : {}; - - this.pool_.getSession((err, session?, transaction?) => { - if (err && isSessionNotFoundError(err as grpc.ServiceError)) { - this.runTransaction(options, runFn!); - return; - } - if (err) { - runFn!(err as grpc.ServiceError); - return; - } - if (options.optimisticLock) { - transaction!.useOptimisticLock(); - } - if (options.excludeTxnFromChangeStreams) { - transaction!.excludeTxnFromChangeStreams(); - } - - const release = this.pool_.release.bind(this.pool_, session!); - const runner = new TransactionRunner( - session!, - transaction!, - runFn!, - options - ); + startTrace('Database.runTransaction', {}, span => { + const runFn = + typeof optionsOrRunFn === 'function' + ? (optionsOrRunFn as RunTransactionCallback) + : fn; + const options = + typeof optionsOrRunFn === 'object' && optionsOrRunFn + ? (optionsOrRunFn as RunTransactionOptions) + : {}; + + this.pool_.getSession((err, session?, transaction?) => { + if (err) { + setSpanError(span, err); + } - runner.run().then(release, err => { - if (isSessionNotFoundError(err)) { - release(); + if (err && isSessionNotFoundError(err as grpc.ServiceError)) { + span.end(); this.runTransaction(options, runFn!); - } else { - setImmediate(runFn!, err); - release(); + return; + } + + if (err) { + span.end(); + runFn!(err as grpc.ServiceError); + return; + } + if (options.optimisticLock) { + transaction!.useOptimisticLock(); } + if (options.excludeTxnFromChangeStreams) { + transaction!.excludeTxnFromChangeStreams(); + } + + const release = () => { + span.end(); + this.pool_.release(session!); + }; + + const runner = new TransactionRunner( + session!, + transaction!, + runFn!, + options + ); + + runner.run().then(release, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + + if (isSessionNotFoundError(err)) { + release(); + this.runTransaction(options, runFn!); + } else { + setImmediate(runFn!, err); + release(); + } + }); }); }); } @@ -3294,59 +3347,69 @@ class Database extends common.GrpcServiceObject { mutationGroups: MutationGroup[], options?: BatchWriteOptions ): NodeJS.ReadableStream { - const proxyStream: Transform = through.obj(); + return startTrace('Database.batchWriteAtLeastOnce', {}, span => { + const proxyStream: Transform = through.obj(); - this.pool_.getSession((err, session) => { - if (err) { - proxyStream.destroy(err); - return; - } - const gaxOpts = extend(true, {}, options?.gaxOptions); - const reqOpts = Object.assign( - {} as spannerClient.spanner.v1.BatchWriteRequest, - { - session: session!.formattedName_!, - mutationGroups: mutationGroups.map(mg => mg.proto()), - requestOptions: options?.requestOptions, - excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams, + this.pool_.getSession((err, session) => { + if (err) { + setSpanError(span, err); + proxyStream.destroy(err); + span.end(); + return; } - ); - let dataReceived = false; - let dataStream = this.requestStream({ - client: 'SpannerClient', - method: 'batchWrite', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }); - dataStream - .once('data', () => (dataReceived = true)) - .once('error', err => { - if ( - !dataReceived && - isSessionNotFoundError(err as grpc.ServiceError) - ) { - // If there's a 'Session not found' error and we have not yet received - // any data, we can safely retry the writes on a new session. - // Register the error on the session so the pool can discard it. - if (session) { - session.lastError = err as grpc.ServiceError; - } - // Remove the current data stream from the end user stream. - dataStream.unpipe(proxyStream); - dataStream.end(); - // Create a new stream and add it to the end user stream. - dataStream = this.batchWriteAtLeastOnce(mutationGroups, options); - dataStream.pipe(proxyStream); - } else { - proxyStream.destroy(err); + const gaxOpts = extend(true, {}, options?.gaxOptions); + const reqOpts = Object.assign( + {} as spannerClient.spanner.v1.BatchWriteRequest, + { + session: session!.formattedName_!, + mutationGroups: mutationGroups.map(mg => mg.proto()), + requestOptions: options?.requestOptions, + excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams, } - }) - .once('end', () => this.pool_.release(session!)) - .pipe(proxyStream); - }); + ); + let dataReceived = false; + let dataStream = this.requestStream({ + client: 'SpannerClient', + method: 'batchWrite', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }); + dataStream + .once('data', () => (dataReceived = true)) + .once('error', err => { + if ( + !dataReceived && + isSessionNotFoundError(err as grpc.ServiceError) + ) { + // If there's a 'Session not found' error and we have not yet received + // any data, we can safely retry the writes on a new session. + // Register the error on the session so the pool can discard it. + if (session) { + session.lastError = err as grpc.ServiceError; + } + // Remove the current data stream from the end user stream. + dataStream.unpipe(proxyStream); + dataStream.end(); + // Create a new stream and add it to the end user stream. + dataStream = this.batchWriteAtLeastOnce(mutationGroups, options); + dataStream.pipe(proxyStream); + } else { + proxyStream.destroy(err); + } - return proxyStream as NodeJS.ReadableStream; + setSpanError(span, err); + span.end(); + }) + .once('end', () => { + this.pool_.release(session!); + span.end(); + }) + .pipe(proxyStream); + }); + + return proxyStream as NodeJS.ReadableStream; + }); } /** diff --git a/src/index.ts b/src/index.ts index f6eedf081..c24e92f7b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -80,6 +80,14 @@ import { import grpcGcpModule = require('grpc-gcp'); const grpcGcp = grpcGcpModule(grpc); import * as v1 from './v1'; +import { + ObservabilityOptions, + applyObservabilityOptions, + getActiveOrNoopSpan, + startTrace, + setSpanError, + setTracerProvider, +} from './instrument'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); @@ -138,7 +146,9 @@ export interface SpannerOptions extends GrpcClientOptions { sslCreds?: grpc.ChannelCredentials; routeToLeaderEnabled?: boolean; directedReadOptions?: google.spanner.v1.IDirectedReadOptions | null; + observabilityConfig?: ObservabilityOptions | undefined; } + export interface RequestConfig { client: string; method: string; @@ -300,6 +310,7 @@ class Spanner extends GrpcService { } } } + options = Object.assign( { libName: 'gccl', @@ -332,6 +343,7 @@ class Spanner extends GrpcService { options.port = emulatorHost.port; options.sslCreds = grpc.credentials.createInsecure(); } + const config = { baseUrl: options.apiEndpoint || @@ -365,6 +377,7 @@ class Spanner extends GrpcService { [CLOUD_RESOURCE_HEADER]: this.projectFormattedName_, }; this.directedReadOptions = directedReadOptions; + applyObservabilityOptions(options.observabilityConfig); } /** @@ -417,13 +430,16 @@ class Spanner extends GrpcService { /** Closes this Spanner client and cleans up all resources used by it. */ close(): void { - this.clients_.forEach(c => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const client = c as any; - if (client.operationsClient && client.operationsClient.close) { - client.operationsClient.close(); - } - client.close(); + startTrace('Spanner.close', {}, span => { + this.clients_.forEach(c => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const client = c as any; + if (client.operationsClient && client.operationsClient.close) { + client.operationsClient.close(); + } + client.close(); + }); + span.end(); }); } @@ -530,64 +546,78 @@ class Spanner extends GrpcService { config: CreateInstanceRequest, callback?: CreateInstanceCallback ): void | Promise { - if (!name) { - throw new GoogleError('A name is required to create an instance.'); - } - if (!config) { - throw new GoogleError( - ['A configuration object is required to create an instance.'].join('') - ); - } - const formattedName = Instance.formatName_(this.projectId, name); - const displayName = config.displayName || formattedName.split('/').pop(); - const reqOpts = { - parent: this.projectFormattedName_, - instanceId: formattedName.split('/').pop(), - instance: extend( - { - name: formattedName, - displayName, - nodeCount: config.nodes, - processingUnits: config.processingUnits, - }, - config - ), - }; - - if (reqOpts.instance.nodeCount && reqOpts.instance.processingUnits) { - throw new GoogleError( - ['Only one of nodeCount or processingUnits can be specified.'].join('') - ); - } - if (!reqOpts.instance.nodeCount && !reqOpts.instance.processingUnits) { - // If neither nodes nor processingUnits are specified, default to a - // nodeCount of 1. - reqOpts.instance.nodeCount = 1; - } + return startTrace('SpannerClient.createInstance', {}, span => { + if (!name) { + const msg = 'A name is required to create an instance'; + setSpanError(span, msg); + span.recordException(msg); + span.end(); + throw new GoogleError(msg); + } + if (!config) { + const msg = 'A configuration object is required to create an instance'; + setSpanError(span, msg); + span.recordException(msg); + span.end(); + throw new GoogleError([msg].join('')); + } + const formattedName = Instance.formatName_(this.projectId, name); + const displayName = config.displayName || formattedName.split('/').pop(); + const reqOpts = { + parent: this.projectFormattedName_, + instanceId: formattedName.split('/').pop(), + instance: extend( + { + name: formattedName, + displayName, + nodeCount: config.nodes, + processingUnits: config.processingUnits, + }, + config + ), + }; + + if (reqOpts.instance.nodeCount && reqOpts.instance.processingUnits) { + const msg = + 'Only one of nodeCount or processingUnits can be specified.'; + setSpanError(span, msg); + span.recordException(msg); + span.end(); + throw new GoogleError(msg); + } + if (!reqOpts.instance.nodeCount && !reqOpts.instance.processingUnits) { + // If neither nodes nor processingUnits are specified, default to a + // nodeCount of 1. + reqOpts.instance.nodeCount = 1; + } - delete reqOpts.instance.nodes; - delete reqOpts.instance.gaxOptions; + delete reqOpts.instance.nodes; + delete reqOpts.instance.gaxOptions; - if (config.config!.indexOf('/') === -1) { - reqOpts.instance.config = `projects/${this.projectId}/instanceConfigs/${config.config}`; - } - this.request( - { - client: 'InstanceAdminClient', - method: 'createInstance', - reqOpts, - gaxOpts: config.gaxOptions, - headers: this.resourceHeader_, - }, - (err, operation, resp) => { - if (err) { - callback!(err, null, null, resp); - return; - } - const instance = this.instance(formattedName); - callback!(null, instance, operation, resp); + if (config.config!.indexOf('/') === -1) { + reqOpts.instance.config = `projects/${this.projectId}/instanceConfigs/${config.config}`; } - ); + this.request( + { + client: 'InstanceAdminClient', + method: 'createInstance', + reqOpts, + gaxOpts: config.gaxOptions, + headers: this.resourceHeader_, + }, + (err, operation, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, null, null, resp); + return; + } + const instance = this.instance(formattedName); + span.end(); + callback!(null, instance, operation, resp); + } + ); + }); } /** @@ -684,61 +714,67 @@ class Spanner extends GrpcService { optionsOrCallback?: GetInstancesOptions | GetInstancesCallback, cb?: GetInstancesCallback ): Promise | void { - // eslint-disable-next-line @typescript-eslint/no-this-alias - const self = this; - const options = - typeof optionsOrCallback === 'object' - ? optionsOrCallback - : ({} as GetInstancesOptions); - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - - const gaxOpts = extend(true, {}, options.gaxOptions); + return startTrace('SpannerClient.getInstances', {}, span => { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const self = this; + const options = + typeof optionsOrCallback === 'object' + ? optionsOrCallback + : ({} as GetInstancesOptions); + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + + const gaxOpts = extend(true, {}, options.gaxOptions); + + let reqOpts = extend({}, options, { + parent: 'projects/' + this.projectId, + }); - let reqOpts = extend({}, options, { - parent: 'projects/' + this.projectId, - }); + delete reqOpts.gaxOptions; - delete reqOpts.gaxOptions; + // Copy over pageSize and pageToken values from gaxOptions. + // However values set on options take precedence. + if (gaxOpts) { + reqOpts = extend( + {}, + { + pageSize: (gaxOpts as GetInstancesOptions).pageSize, + pageToken: (gaxOpts as GetInstancesOptions).pageToken, + }, + reqOpts + ); + delete (gaxOpts as GetInstancesOptions).pageToken; + delete (gaxOpts as GetInstancesOptions).pageSize; + } - // Copy over pageSize and pageToken values from gaxOptions. - // However values set on options take precedence. - if (gaxOpts) { - reqOpts = extend( - {}, + this.request( { - pageSize: (gaxOpts as GetInstancesOptions).pageSize, - pageToken: (gaxOpts as GetInstancesOptions).pageToken, + client: 'InstanceAdminClient', + method: 'listInstances', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, }, - reqOpts - ); - delete (gaxOpts as GetInstancesOptions).pageToken; - delete (gaxOpts as GetInstancesOptions).pageSize; - } - - this.request( - { - client: 'InstanceAdminClient', - method: 'listInstances', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, instances, nextPageRequest, ...args) => { - let instanceInstances: Instance[] | null = null; - if (instances) { - instanceInstances = instances.map(instance => { - const instanceInstance = self.instance(instance.name); - instanceInstance.metadata = instance; - return instanceInstance; - }); + (err, instances, nextPageRequest, ...args) => { + let instanceInstances: Instance[] | null = null; + if (instances) { + instanceInstances = instances.map(instance => { + const instanceInstance = self.instance(instance.name); + instanceInstance.metadata = instance; + return instanceInstance; + }); + } + if (err) { + setSpanError(span, err); + } + const nextQuery = nextPageRequest! + ? extend({}, options, nextPageRequest!) + : null; + span.end(); + callback!(err, instanceInstances, nextQuery, ...args); } - const nextQuery = nextPageRequest! - ? extend({}, options, nextPageRequest!) - : null; - callback!(err, instanceInstances, nextQuery, ...args); - } - ); + ); + }); } /** @@ -922,67 +958,78 @@ class Spanner extends GrpcService { config: CreateInstanceConfigRequest, callback?: CreateInstanceConfigCallback ): void | Promise { - if (!name) { - throw new GoogleError('A name is required to create an instance config.'); - } - if (!config) { - throw new GoogleError( - [ - 'A configuration object is required to create an instance config.', - ].join('') - ); - } - if (!config.baseConfig) { - throw new GoogleError( - ['Base instance config is required to create an instance config.'].join( - '' - ) - ); - } - const formattedName = InstanceConfig.formatName_(this.projectId, name); - const displayName = config.displayName || formattedName.split('/').pop(); - const reqOpts = { - parent: this.projectFormattedName_, - instanceConfigId: formattedName.split('/').pop(), - instanceConfig: extend( - { - name: formattedName, - displayName, - }, - config - ), - validateOnly: config.validateOnly, - }; + return startTrace('SpannerClient.createInstanceConfig', {}, span => { + if (!name) { + const msg = 'A name is required to create an instance config.'; + setSpanError(span, msg); + span.recordException(msg); + span.end(); + throw new GoogleError(msg); + } + if (!config) { + const msg = + 'A configuration object is required to create an instance config.'; + setSpanError(span, msg); + span.recordException(msg); + span.end(); + throw new GoogleError(msg); + } + if (!config.baseConfig) { + const msg = + 'Base instance config is required to create an instance config.'; + setSpanError(span, msg); + span.recordException(msg); + span.end(); + throw new GoogleError([msg].join('')); + } + const formattedName = InstanceConfig.formatName_(this.projectId, name); + const displayName = config.displayName || formattedName.split('/').pop(); + const reqOpts = { + parent: this.projectFormattedName_, + instanceConfigId: formattedName.split('/').pop(), + instanceConfig: extend( + { + name: formattedName, + displayName, + }, + config + ), + validateOnly: config.validateOnly, + }; - if (config.baseConfig!.indexOf('/') === -1) { - reqOpts.instanceConfig.baseConfig = `projects/${this.projectId}/instanceConfigs/${config.baseConfig}`; - } + if (config.baseConfig!.indexOf('/') === -1) { + reqOpts.instanceConfig.baseConfig = `projects/${this.projectId}/instanceConfigs/${config.baseConfig}`; + } - // validateOnly need not be passed in if it is null. - if (reqOpts.validateOnly === null || reqOpts.validateOnly === undefined) - delete reqOpts.validateOnly; + // validateOnly need not be passed in if it is null. + if (reqOpts.validateOnly === null || reqOpts.validateOnly === undefined) + delete reqOpts.validateOnly; - // validateOnly and gaxOptions are not fields in InstanceConfig. - delete reqOpts.instanceConfig.validateOnly; - delete reqOpts.instanceConfig.gaxOptions; + // validateOnly and gaxOptions are not fields in InstanceConfig. + delete reqOpts.instanceConfig.validateOnly; + delete reqOpts.instanceConfig.gaxOptions; - this.request( - { - client: 'InstanceAdminClient', - method: 'createInstanceConfig', - reqOpts, - gaxOpts: config.gaxOptions, - headers: this.resourceHeader_, - }, - (err, operation, resp) => { - if (err) { - callback!(err, null, null, resp); - return; + this.request( + { + client: 'InstanceAdminClient', + method: 'createInstanceConfig', + reqOpts, + gaxOpts: config.gaxOptions, + headers: this.resourceHeader_, + }, + (err, operation, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, null, null, resp); + return; + } + const instanceConfig = this.instanceConfig(formattedName); + span.end(); + callback!(null, instanceConfig, operation, resp); } - const instanceConfig = this.instanceConfig(formattedName); - callback!(null, instanceConfig, operation, resp); - } - ); + ); + }); } /** @@ -1081,49 +1128,57 @@ class Spanner extends GrpcService { optionsOrCallback?: GetInstanceConfigsOptions | GetInstanceConfigsCallback, cb?: GetInstanceConfigsCallback ): Promise | void { - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb; - const options = - typeof optionsOrCallback === 'object' - ? optionsOrCallback - : ({} as GetInstanceConfigsOptions); - - const gaxOpts = extend(true, {}, options.gaxOptions); - let reqOpts = extend({}, options, { - parent: 'projects/' + this.projectId, - }); - delete reqOpts.gaxOptions; + return startTrace('SpannerClient.getInstanceConfigs', {}, span => { + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb; + const options = + typeof optionsOrCallback === 'object' + ? optionsOrCallback + : ({} as GetInstanceConfigsOptions); + + const gaxOpts = extend(true, {}, options.gaxOptions); + let reqOpts = extend({}, options, { + parent: 'projects/' + this.projectId, + }); + delete reqOpts.gaxOptions; + + // Copy over pageSize and pageToken values from gaxOptions. + // However values set on options take precedence. + if (gaxOpts) { + // TODO: Annotate the span with the pageSize and pageToken values. + reqOpts = extend( + {}, + { + pageSize: (gaxOpts as GetInstanceConfigsOptions).pageSize, + pageToken: (gaxOpts as GetInstanceConfigsOptions).pageToken, + }, + reqOpts + ); + delete (gaxOpts as GetInstanceConfigsOptions).pageSize; + delete (gaxOpts as GetInstanceConfigsOptions).pageToken; + } - // Copy over pageSize and pageToken values from gaxOptions. - // However values set on options take precedence. - if (gaxOpts) { - reqOpts = extend( - {}, + return this.request( { - pageSize: (gaxOpts as GetInstanceConfigsOptions).pageSize, - pageToken: (gaxOpts as GetInstanceConfigsOptions).pageToken, + client: 'InstanceAdminClient', + method: 'listInstanceConfigs', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, }, - reqOpts - ); - delete (gaxOpts as GetInstanceConfigsOptions).pageSize; - delete (gaxOpts as GetInstanceConfigsOptions).pageToken; - } + (err, instanceConfigs, nextPageRequest, ...args) => { + if (err) { + setSpanError(span, err); + } - return this.request( - { - client: 'InstanceAdminClient', - method: 'listInstanceConfigs', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, instanceConfigs, nextPageRequest, ...args) => { - const nextQuery = nextPageRequest! - ? extend({}, options, nextPageRequest!) - : null; - callback!(err, instanceConfigs, nextQuery, ...args); - } - ); + const nextQuery = nextPageRequest! + ? extend({}, options, nextPageRequest!) + : null; + span.end(); + callback!(err, instanceConfigs, nextQuery, ...args); + } + ); + }); } /** @@ -1265,33 +1320,39 @@ class Spanner extends GrpcService { optionsOrCallback?: GetInstanceConfigOptions | GetInstanceConfigCallback, cb?: GetInstanceConfigCallback ): Promise | void { - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb; - const options = - typeof optionsOrCallback === 'object' - ? optionsOrCallback - : ({} as GetInstanceConfigOptions); - - const reqOpts = extend( - {}, - { - name: 'projects/' + this.projectId + '/instanceConfigs/' + name, - } - ); - const gaxOpts = extend({}, options.gaxOptions); + return startTrace('SpannerClient.getInstanceConfig', {}, span => { + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb; + const options = + typeof optionsOrCallback === 'object' + ? optionsOrCallback + : ({} as GetInstanceConfigOptions); + + const reqOpts = extend( + {}, + { + name: 'projects/' + this.projectId + '/instanceConfigs/' + name, + } + ); + const gaxOpts = extend({}, options.gaxOptions); - return this.request( - { - client: 'InstanceAdminClient', - method: 'getInstanceConfig', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, instanceConfig) => { - callback!(err, instanceConfig); - } - ); + return this.request( + { + client: 'InstanceAdminClient', + method: 'getInstanceConfig', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + (err, instanceConfig) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, instanceConfig); + } + ); + }); } /** @@ -1373,49 +1434,57 @@ class Spanner extends GrpcService { | GetInstanceConfigOperationsCallback, cb?: GetInstanceConfigOperationsCallback ): void | Promise { - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const options = - typeof optionsOrCallback === 'object' - ? optionsOrCallback - : ({} as GetInstanceConfigOperationsOptions); - const gaxOpts = extend(true, {}, options.gaxOptions); - let reqOpts = extend({}, options, { - parent: this.projectFormattedName_, - }); - delete reqOpts.gaxOptions; + return startTrace('SpannerClient.getInstanceConfigOperations', {}, span => { + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const options = + typeof optionsOrCallback === 'object' + ? optionsOrCallback + : ({} as GetInstanceConfigOperationsOptions); + const gaxOpts = extend(true, {}, options.gaxOptions); + let reqOpts = extend({}, options, { + parent: this.projectFormattedName_, + }); + delete reqOpts.gaxOptions; + + // Copy over pageSize and pageToken values from gaxOptions. + // However, values set on options take precedence. + if (gaxOpts) { + reqOpts = extend( + {}, + { + pageSize: (gaxOpts as GetInstanceConfigOperationsOptions).pageSize, + pageToken: (gaxOpts as GetInstanceConfigOperationsOptions) + .pageToken, + }, + reqOpts + ); + delete (gaxOpts as GetInstanceConfigOperationsOptions).pageSize; + delete (gaxOpts as GetInstanceConfigOperationsOptions).pageToken; + } - // Copy over pageSize and pageToken values from gaxOptions. - // However, values set on options take precedence. - if (gaxOpts) { - reqOpts = extend( - {}, + this.request( { - pageSize: (gaxOpts as GetInstanceConfigOperationsOptions).pageSize, - pageToken: (gaxOpts as GetInstanceConfigOperationsOptions).pageToken, + client: 'InstanceAdminClient', + method: 'listInstanceConfigOperations', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, }, - reqOpts - ); - delete (gaxOpts as GetInstanceConfigOperationsOptions).pageSize; - delete (gaxOpts as GetInstanceConfigOperationsOptions).pageToken; - } + (err, operations, nextPageRequest, ...args) => { + if (err) { + setSpanError(span, err); + } - this.request( - { - client: 'InstanceAdminClient', - method: 'listInstanceConfigOperations', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, operations, nextPageRequest, ...args) => { - const nextQuery = nextPageRequest! - ? extend({}, options, nextPageRequest!) - : null; + const nextQuery = nextPageRequest! + ? extend({}, options, nextPageRequest!) + : null; - callback!(err, operations, nextQuery, ...args); - } - ); + span.end(); + callback!(err, operations, nextQuery, ...args); + } + ); + }); } /** @@ -1482,11 +1551,17 @@ class Spanner extends GrpcService { * @param {function} callback Callback function */ prepareGapicRequest_(config, callback) { + const span = getActiveOrNoopSpan(); + span.addEvent('prepareGapicRequest'); + this.auth.getProjectId((err, projectId) => { if (err) { + span.addEvent('failed to correctly retrieve the projectId'); + setSpanError(span, err); callback(err); return; } + const clientName = config.client; if (!this.clients_.has(clientName)) { this.clients_.set(clientName, new v1[clientName](this.options)); @@ -1532,6 +1607,7 @@ class Spanner extends GrpcService { }, }) ); + callback(null, requestFn); }); } diff --git a/src/instance.ts b/src/instance.ts index 00b584e42..e78b2a563 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -51,6 +51,7 @@ import {google as instanceAdmin} from '../protos/protos'; import {google as databaseAdmin} from '../protos/protos'; import {google as spannerClient} from '../protos/protos'; import {CreateInstanceRequest} from './index'; +import {startTrace, setSpanError} from './instrument'; export type IBackup = databaseAdmin.spanner.admin.database.v1.IBackup; export type IDatabase = databaseAdmin.spanner.admin.database.v1.IDatabase; @@ -876,58 +877,68 @@ class Instance extends common.GrpcServiceObject { optionsOrCallback?: CreateDatabaseOptions | CreateDatabaseCallback, cb?: CreateDatabaseCallback ): void | Promise { - if (!name) { - throw new GoogleError('A name is required to create a database.'); - } - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const options = - typeof optionsOrCallback === 'object' - ? optionsOrCallback - : ({} as CreateDatabaseOptions); - - const poolOptions = options.poolOptions; - const poolCtor = options.poolCtor; - let createStatement = 'CREATE DATABASE `' + name.split('/').pop() + '`'; - if ( - databaseAdmin.spanner.admin.database.v1.DatabaseDialect.POSTGRESQL === - options.databaseDialect - ) { - createStatement = 'CREATE DATABASE "' + name.split('/').pop() + '"'; - } - const reqOpts = extend( - { - parent: this.formattedName_, - createStatement: createStatement, - }, - options - ); + return startTrace('Instance.createDatabase', {}, span => { + if (!name) { + const msg = 'A name is required to create a database.'; + setSpanError(span, msg); + span.end(); + throw new GoogleError(msg); + } - delete reqOpts.poolOptions; - delete reqOpts.poolCtor; - delete reqOpts.gaxOptions; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const options = + typeof optionsOrCallback === 'object' + ? optionsOrCallback + : ({} as CreateDatabaseOptions); - if (reqOpts.schema) { - reqOpts.extraStatements = arrify(reqOpts.schema); - delete reqOpts.schema; - } - this.request( - { - client: 'DatabaseAdminClient', - method: 'createDatabase', - reqOpts, - gaxOpts: options.gaxOptions, - headers: this.resourceHeader_, - }, - (err, operation, resp) => { - if (err) { - callback(err, null, null, resp); - return; - } - const database = this.database(name, poolOptions || poolCtor); - callback(null, database, operation, resp); + const poolOptions = options.poolOptions; + const poolCtor = options.poolCtor; + let createStatement = 'CREATE DATABASE `' + name.split('/').pop() + '`'; + if ( + databaseAdmin.spanner.admin.database.v1.DatabaseDialect.POSTGRESQL === + options.databaseDialect + ) { + createStatement = 'CREATE DATABASE "' + name.split('/').pop() + '"'; } - ); + const reqOpts = extend( + { + parent: this.formattedName_, + createStatement: createStatement, + }, + options + ); + + delete reqOpts.poolOptions; + delete reqOpts.poolCtor; + delete reqOpts.gaxOptions; + + if (reqOpts.schema) { + reqOpts.extraStatements = arrify(reqOpts.schema); + delete reqOpts.schema; + } + this.request( + { + client: 'DatabaseAdminClient', + method: 'createDatabase', + reqOpts, + gaxOpts: options.gaxOptions, + headers: this.resourceHeader_, + }, + (err, operation, resp) => { + if (err) { + // TODO: Infer the status and code from translating the error. + setSpanError(span, err); + span.end(); + callback(err, null, null, resp); + return; + } + const database = this.database(name, poolOptions || poolCtor); + span.end(); + callback(null, database, operation, resp); + } + ); + }); } /** @@ -1034,38 +1045,45 @@ class Instance extends common.GrpcServiceObject { optionsOrCallback?: CallOptions | DeleteInstanceCallback, cb?: DeleteInstanceCallback ): void | Promise { - const gaxOpts = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + return startTrace('Instance.delete', {}, span => { + const gaxOpts = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const reqOpts = { - name: this.formattedName_, - }; - Promise.all( - Array.from(this.databases_.values()).map(database => { - return database.close(); - }) - ) - .catch(() => {}) - .then(() => { - this.databases_.clear(); - this.request( - { - client: 'InstanceAdminClient', - method: 'deleteInstance', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, resp) => { - if (!err) { - this.parent.instances_.delete(this.id); + const reqOpts = { + name: this.formattedName_, + }; + Promise.all( + Array.from(this.databases_.values()).map(database => { + return database.close(); + }) + ) + .catch(() => {}) + .then(() => { + this.databases_.clear(); + this.request( + { + client: 'InstanceAdminClient', + method: 'deleteInstance', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + (err, resp) => { + if (!err) { + // TODO: Create a sub-span about invoking instances_.delete + this.parent.instances_.delete(this.id); + } else { + setSpanError(span, err); + } + + span.end(); + callback!(err, resp!); } - callback!(err, resp!); - } - ); - }); + ); + }); + }); } /** @@ -1111,21 +1129,29 @@ class Instance extends common.GrpcServiceObject { optionsOrCallback?: CallOptions | ExistsInstanceCallback, cb?: ExistsInstanceCallback ): void | Promise { - const gaxOptions = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + return startTrace('Instance.exists', {}, span => { + const gaxOptions = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const NOT_FOUND = 5; + const NOT_FOUND = 5; - this.getMetadata({gaxOptions}, err => { - if (err && err.code !== NOT_FOUND) { - callback!(err, null); - return; - } + this.getMetadata({gaxOptions}, err => { + if (err) { + setSpanError(span, err); + } - const exists = !err || err.code !== NOT_FOUND; - callback!(null, exists); + if (err && err.code !== NOT_FOUND) { + span.end(); + callback!(err, null); + return; + } + + const exists = !err || err.code !== NOT_FOUND; + span.end(); + callback!(null, exists); + }); }); } @@ -1184,27 +1210,43 @@ class Instance extends common.GrpcServiceObject { optionsOrCallback?: GetInstanceConfig | GetInstanceCallback, cb?: GetInstanceCallback ): void | Promise { - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const options = - typeof optionsOrCallback === 'object' - ? optionsOrCallback - : ({} as GetInstanceConfig); + return startTrace('Instance.get', {}, span => { + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const options = + typeof optionsOrCallback === 'object' + ? optionsOrCallback + : ({} as GetInstanceConfig); - const getMetadataOptions: GetInstanceMetadataOptions = new Object(null); - if (options.fieldNames) { - getMetadataOptions.fieldNames = options.fieldNames; - } - if (options.gaxOptions) { - getMetadataOptions.gaxOptions = options.gaxOptions; - } + const getMetadataOptions: GetInstanceMetadataOptions = new Object(null); + if (options.fieldNames) { + getMetadataOptions.fieldNames = options.fieldNames; + } + if (options.gaxOptions) { + getMetadataOptions.gaxOptions = options.gaxOptions; + } + + this.getMetadata(getMetadataOptions, (err, metadata) => { + if (!err) { + span.end(); + callback(null, this, metadata!); + return; + } + + // Otherwise an error occurred. + setSpanError(span, err); + + if (err.code !== 5 || !options.autoCreate) { + span.end(); + callback(err); + return; + } - this.getMetadata(getMetadataOptions, (err, metadata) => { - if (err) { - if (err.code === 5 && options.autoCreate) { - const createOptions = extend(true, {}, options); - delete createOptions.fieldNames; - delete createOptions.autoCreate; + // Attempt to create the instance. + const createOptions = extend(true, {}, options); + delete createOptions.fieldNames; + delete createOptions.autoCreate; + return startTrace('Instance.create', {}, createSpan => { this.create( createOptions, ( @@ -1213,23 +1255,32 @@ class Instance extends common.GrpcServiceObject { operation?: GaxOperation | null ) => { if (err) { + setSpanError(createSpan, err); + createSpan.end(); + span.end(); callback(err); return; } + + // Otherwise attempt the creation operation. operation! - .on('error', callback) + .on('error', (err, obj, metadata) => { + setSpanError(createSpan, err); + createSpan.end(); + span.end(); + callback(err, obj, metadata); + }) .on('complete', (metadata: IInstance) => { this.metadata = metadata; + createSpan.end(); + span.end(); callback(null, this, metadata); }); } ); return; - } - callback(err); - return; - } - callback(null, this, metadata!); + }); + }); }); } @@ -1313,63 +1364,72 @@ class Instance extends common.GrpcServiceObject { optionsOrCallback?: GetDatabasesOptions | GetDatabasesCallback, cb?: GetDatabasesCallback ): void | Promise { - // eslint-disable-next-line @typescript-eslint/no-this-alias - const self = this; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const options = - typeof optionsOrCallback === 'object' - ? optionsOrCallback - : ({} as GetDatabasesOptions); + return startTrace('Instance.getDatabases', {}, span => { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const self = this; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const options = + typeof optionsOrCallback === 'object' + ? optionsOrCallback + : ({} as GetDatabasesOptions); - const gaxOpts = extend(true, {}, options.gaxOptions); - let reqOpts = extend({}, options, { - parent: this.formattedName_, - }); - delete reqOpts.gaxOptions; + const gaxOpts = extend(true, {}, options.gaxOptions); + let reqOpts = extend({}, options, { + parent: this.formattedName_, + }); + delete reqOpts.gaxOptions; - // Copy over pageSize and pageToken values from gaxOptions. - // However values set on options take precedence. - if (gaxOpts) { - reqOpts = extend( - {}, + // Copy over pageSize and pageToken values from gaxOptions. + // However values set on options take precedence. + if (gaxOpts) { + reqOpts = extend( + {}, + { + pageSize: (gaxOpts as GetBackupsOptions).pageSize, + pageToken: (gaxOpts as GetBackupsOptions).pageToken, + }, + reqOpts + ); + delete (gaxOpts as GetBackupsOptions).pageSize; + delete (gaxOpts as GetBackupsOptions).pageToken; + } + + this.request< + IDatabase, + databaseAdmin.spanner.admin.database.v1.IListDatabasesResponse + >( { - pageSize: (gaxOpts as GetBackupsOptions).pageSize, - pageToken: (gaxOpts as GetBackupsOptions).pageToken, + client: 'DatabaseAdminClient', + method: 'listDatabases', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, }, - reqOpts - ); - delete (gaxOpts as GetBackupsOptions).pageSize; - delete (gaxOpts as GetBackupsOptions).pageToken; - } + (err, rowDatabases, nextPageRequest, ...args) => { + let databases: Database[] | null = null; + if (rowDatabases) { + databases = rowDatabases.map(database => { + const databaseInstance = self.database(database.name!, { + min: 0, + }); + databaseInstance.metadata = database; + return databaseInstance; + }); + } + const nextQuery = nextPageRequest! + ? extend({}, options, nextPageRequest!) + : null; - this.request< - IDatabase, - databaseAdmin.spanner.admin.database.v1.IListDatabasesResponse - >( - { - client: 'DatabaseAdminClient', - method: 'listDatabases', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, rowDatabases, nextPageRequest, ...args) => { - let databases: Database[] | null = null; - if (rowDatabases) { - databases = rowDatabases.map(database => { - const databaseInstance = self.database(database.name!, {min: 0}); - databaseInstance.metadata = database; - return databaseInstance; - }); - } - const nextQuery = nextPageRequest! - ? extend({}, options, nextPageRequest!) - : null; + if (err) { + setSpanError(span, err); + } - callback(err, databases, nextQuery, ...args); - } - ); + span.end(); + callback(err, databases, nextQuery, ...args); + } + ); + }); } /** @@ -1412,6 +1472,7 @@ class Instance extends common.GrpcServiceObject { * ``` */ getDatabasesStream(options: GetDatabasesOptions = {}): NodeJS.ReadableStream { + // TODO: Instrument this streaming method with Otel. const gaxOpts = extend(true, {}, options.gaxOptions); let reqOpts = extend({}, options, { @@ -1518,33 +1579,41 @@ class Instance extends common.GrpcServiceObject { | GetInstanceMetadataCallback, cb?: GetInstanceMetadataCallback ): Promise | void { - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const options = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const reqOpts = { - name: this.formattedName_, - }; - if (options.fieldNames) { - reqOpts['fieldMask'] = { - paths: arrify(options['fieldNames']!).map(snakeCase), + return startTrace('Instance.getMetadata', {}, span => { + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const reqOpts = { + name: this.formattedName_, }; - } - return this.request( - { - client: 'InstanceAdminClient', - method: 'getInstance', - reqOpts, - gaxOpts: options.gaxOptions, - headers: this.resourceHeader_, - }, - (err, resp) => { - if (resp) { - this.metadata = resp; - } - callback!(err, resp); + if (options.fieldNames) { + reqOpts['fieldMask'] = { + paths: arrify(options['fieldNames']!).map(snakeCase), + }; } - ); + return this.request( + { + client: 'InstanceAdminClient', + method: 'getInstance', + reqOpts, + gaxOpts: options.gaxOptions, + headers: this.resourceHeader_, + }, + (err, resp) => { + if (err) { + setSpanError(span, err); + } + + if (resp) { + this.metadata = resp; + } + + span.end(); + callback!(err, resp); + } + ); + }); } /** @@ -1610,32 +1679,40 @@ class Instance extends common.GrpcServiceObject { optionsOrCallback?: CallOptions | SetInstanceMetadataCallback, cb?: SetInstanceMetadataCallback ): void | Promise { - const gaxOpts = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + return startTrace('Instance.setMetadata', {}, span => { + const gaxOpts = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const reqOpts = { - instance: extend( + const reqOpts = { + instance: extend( + { + name: this.formattedName_, + }, + metadata + ), + fieldMask: { + paths: Object.keys(metadata).map(snakeCase), + }, + }; + return this.request( { - name: this.formattedName_, + client: 'InstanceAdminClient', + method: 'updateInstance', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, }, - metadata - ), - fieldMask: { - paths: Object.keys(metadata).map(snakeCase), - }, - }; - return this.request( - { - client: 'InstanceAdminClient', - method: 'updateInstance', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - callback! - ); + (err, operation, apiResponse) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, operation, apiResponse); + } + ); + }); } /** * Format the instance name to include the project ID. diff --git a/src/instrument.ts b/src/instrument.ts new file mode 100644 index 000000000..982af0575 --- /dev/null +++ b/src/instrument.ts @@ -0,0 +1,249 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + SEMATTRS_DB_STATEMENT, + SEMATTRS_DB_SYSTEM, + SEMATTRS_DB_SQL_TABLE, +} from '@opentelemetry/semantic-conventions'; + +const {TracerProvider} = require('@opentelemetry/sdk-trace-node'); + +// Optional instrumentation that the user will configure if they'd like to. +const { + Instrumentation, + registerInstrumentations, +} = require('@opentelemetry/instrumentation'); + +import { + ContextManager, + Span, + SpanStatusCode, + context, + trace, + INVALID_SPAN_CONTEXT, + SpanAttributes, + TimeInput, + Link, + Exception, + SpanContext, + SpanStatus, + SpanKind, +} from '@opentelemetry/api'; + +let optedInPII: boolean = process.env.SPANNER_NODEJS_ANNOTATE_PII_SQL === '1'; + +interface SQLStatement { + sql: string; +} + +interface observabilityOptions { + tracerProvider: typeof TracerProvider; + enableExtendedTracing: boolean; +} + +export type {observabilityOptions as ObservabilityOptions}; + +export function applyObservabilityOptions( + opts: observabilityOptions | undefined +) { + if (!opts) { + return; + } + + if (opts.tracerProvider) { + setTracerProvider(opts.tracerProvider); + } + + if (opts.enableExtendedTracing) { + optInForSQLStatementOnSpans(); + } +} + +/* +------------------------------------------------------- +Notes and requests from peer review: +------------------------------------------------------- +* TODO: Allow the TracerProvider to be explicitly + added to receive Cloud Spanner traces. +* TODO: Read Java Spanner to find the nodeTracerProvider + and find out how they inject it locally or use it globally + please see https://github.com/googleapis/java-spanner?tab=readme-ov-file#opentelemetry-configuration. +*/ + +let defaultTracerProvider: typeof TracerProvider = undefined; + +// setTracerProvider allows the caller to hook up an OpenTelemetry +// TracerProvider that spans generated from this library shall be attached to, +// instead of using the global configuration. Later on if `getTracer` is invoked and +// the default tracerProvider is unset, it'll use the global tracer +// otherwise it'll use the set TracerProvider. +export function setTracerProvider(freshTracerProvider: typeof TracerProvider) { + defaultTracerProvider = freshTracerProvider; +} + +const TRACER_NAME = 'nodejs-spanner'; + +// getTracer fetches the tracer each time that it is invoked to solve +// the problem of observability being initialized after Spanner objects +// have been already created. +export function getTracer(config?: traceConfig) { + if (config && config.opts && config.opts.tracerProvider) { + return config.opts.tracerProvider.getTracer(TRACER_NAME); + } + if (defaultTracerProvider) { + return defaultTracerProvider.getTracer(TRACER_NAME); + } + // Otherwise use the global tracer. + return trace.getTracer(TRACER_NAME); +} + +// optInForSQLStatementOnSpans is a configurable knob that if +// invoked allows spans to be annotated with the SQL statement +// of the producing function; if optOutofSQLStatementOnSpans +// is invoked the SQL statement annotation shall be dropped. +export function optInForSQLStatementOnSpans() { + optedInPII = true; +} + +export function optOutOfSQLStatementOnSpans() { + optedInPII = false; +} + +interface traceConfig { + sql?: string | SQLStatement; + tableName?: string; + opts?: observabilityOptions; + enableExtendedTracing?: boolean; +} + +export function startTrace ReturnType>( + spanNameSuffix: string, + opts: traceConfig | undefined, + cb: F +): ReturnType { + const origOpts = opts; + opts = opts || {}; + if (typeof origOpts === 'string') { + opts.sql = origOpts as string; + } + + return getTracer(opts).startActiveSpan( + 'cloud.google.com/nodejs/spanner/' + spanNameSuffix, + {kind: SpanKind.CLIENT}, + span => { + span.setAttribute(SEMATTRS_DB_SYSTEM, 'spanner'); + + if (opts.tableName) { + span.setAttribute(SEMATTRS_DB_SQL_TABLE, opts.tableName); + } + + if (opts.sql && (opts.enableExtendedTracing || optedInPII)) { + const sql = opts.sql; + if (typeof sql === 'string') { + span.setAttribute(SEMATTRS_DB_STATEMENT, sql as string); + } else { + const stmt = sql as SQLStatement; + span.setAttribute(SEMATTRS_DB_STATEMENT, stmt.sql); + } + } + + return cb(span); + } + ); +} + +// setSpanError sets err, if non-nil onto the span with +// status.code=ERROR and the message of err.toString() +export function setSpanError(span: Span, err: Error | String) { + if (!err || !span) { + return; + } + + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err.toString(), + }); +} + +export function setGlobalContextManager(manager: ContextManager) { + context.setGlobalContextManager(manager); +} + +export function disableContextAndManager(manager: typeof ContextManager) { + manager.disable(); + context.disable(); +} + +// getActiveOrNoopSpan queries the tracer for the currently active span +// and returns it, otherwise if there is no active span available, it'll +// simply create a NoopSpan. This is important in the cases where we don't +// want to create a new span such is sensitive and frequently called code +// for which the new spans would be too many and thus pollute the trace, +// but yet we'd like to record an important annotation. +export function getActiveOrNoopSpan(): Span { + const span = trace.getActiveSpan(); + if (span) { + return span; + } + return new noopSpan(); +} + +// noopSpan is a pass-through Span that does nothing and shall not +// be exported, nor added into any context. It serves as a placeholder +// to allow calls in sensitive areas like sessionPools to transparently +// add attributes to spans without lots of ugly null checks. +class noopSpan implements Span { + constructor() {} + + spanContext(): SpanContext { + return INVALID_SPAN_CONTEXT; + } + + setAttribute(key: string, value: unknown): this { + return this; + } + + setAttributes(attributes: SpanAttributes): this { + return this; + } + + addEvent(name: string, attributes?: SpanAttributes): this { + return this; + } + + addLink(link: Link): this { + return this; + } + + addLinks(links: Link[]): this { + return this; + } + + setStatus(status: SpanStatus): this { + return this; + } + + end(endTime?: TimeInput): void {} + + isRecording(): boolean { + return false; + } + + recordException(exc: Exception, timeAt?: TimeInput): void {} + + updateName(name: string): this { + return this; + } +} diff --git a/src/session-pool.ts b/src/session-pool.ts index cfc2a4360..8ac0150de 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -24,6 +24,7 @@ import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {GoogleError, grpc, ServiceError} from 'google-gax'; import trace = require('stack-trace'); +import {getActiveOrNoopSpan, setSpanError} from './instrument'; /** * @callback SessionPoolCloseCallback @@ -630,8 +631,13 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @returns {Promise} */ async _acquire(): Promise { + const span = getActiveOrNoopSpan(); if (!this.isOpen) { - throw new GoogleError(errors.Closed); + span.addEvent('session pool is not open'); + const err = new GoogleError(errors.Closed); + span.recordException(err); + setSpanError(span, err); + throw err; } // Get the stacktrace of the caller before we call any async methods, as calling an async method will break the stacktrace. @@ -645,12 +651,17 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { const elapsed = Date.now() - startTime; if (elapsed >= timeout!) { + span.recordException('timed out'); throw new GoogleError(errors.Timeout); } const session = await this._getSession(startTime); if (this._isValidSession(session)) { + span.addEvent('acquired a valid session', { + 'time.elapsed': Date.now() - startTime, + 'session.id': session.id.toString(), + }); return session; } @@ -686,7 +697,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @return {Session} */ _borrowFrom(): Session { + const span = getActiveOrNoopSpan(); const session = this._inventory.sessions.pop()!; + span.addEvent('popped session from inventory'); this._inventory.borrowed.add(session); return session; } @@ -723,6 +736,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @emits SessionPool#createError */ async _createSessions(amount: number): Promise { + const span = getActiveOrNoopSpan(); + span.setAttribute('session.requested.count', amount); + const labels = this.options.labels!; const databaseRole = this.options.databaseRole!; @@ -736,6 +752,8 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { while (amount > 0) { let sessions: Session[] | null = null; + span.addEvent(`trying to create ${amount} sessions`); + try { [sessions] = await this.database.batchCreateSessions({ count: amount, @@ -747,9 +765,12 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { } catch (e) { this._pending -= amount; this.emit('createError', e); + span.recordException(e as Error); + setSpanError(span, e as Error); throw e; } + span.addEvent(`started adding ${amount} sessions to the inventory`); sessions.forEach((session: Session) => { setImmediate(() => { this._inventory.borrowed.add(session); @@ -757,6 +778,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { this.release(session); }); }); + span.addEvent(`finished adding ${amount} sessions to the inventory`); } } @@ -771,10 +793,14 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @returns {Promise} */ async _destroy(session: Session): Promise { + const span = getActiveOrNoopSpan(); + try { await this._requests.add(() => session.delete()); } catch (e) { this.emit('error', e); + span.recordException(e as Error); + setSpanError(span, e as Error); } } @@ -865,21 +891,36 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @returns {Promise} */ async _getSession(startTime: number): Promise { + const span = getActiveOrNoopSpan(); + span.addEvent('Attempting to get session'); + if (this._hasSessionUsableFor()) { - return this._borrowNextAvailableSession(); + span.addEvent('Cache hit: has usable session'); + const sessPromise = this._borrowNextAvailableSession(); + return sessPromise; } + if (this.isFull && this.options.fail!) { - throw new SessionPoolExhaustedError(this._getLeaks()); + span.addEvent('session pool is full and failFast=true'); + const err = new SessionPoolExhaustedError(this._getLeaks()); + span.recordException(err); + setSpanError(span, err); + throw err; } let removeOnceCloseListener: Function; let removeListener: Function; // Wait for a session to become available. + span.addEvent('waiting for a session to become available'); const availableEvent = 'session-available'; const promises = [ new Promise((_, reject) => { - const onceCloseListener = () => reject(new GoogleError(errors.Closed)); + const onceCloseListener = () => { + const err = new GoogleError(errors.Closed); + setSpanError(span, err); + reject(err); + }; this.once('close', onceCloseListener); removeOnceCloseListener = this.removeListener.bind( this, @@ -898,6 +939,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { ]; const timeout = this.options.acquireTimeout; + if (timeout !== undefined) { + span.setAttribute('sessionpool.timeout', timeout); + } let removeTimeoutListener = () => {}; if (!is.infinite(timeout!)) { @@ -919,9 +963,11 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { // Only create a new session if there are more waiters than sessions already // being created. The current requester will be waiter number _numWaiters+1. if (!this.isFull && this.totalPending <= this.totalWaiters) { + span.addEvent('pool is not full and more waiters than sessions'); let amount = this.options.incStep ? this.options.incStep : DEFAULTS.incStep!; + // Create additional sessions if the configured minimum has not been reached. const min = this.options.min ? this.options.min : 0; if (this.size + this.totalPending + amount < min) { @@ -931,11 +977,19 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { if (amount + this.size > this.options.max!) { amount = this.options.max! - this.size; } + span.addEvent('creating new session', { + 'total.pending': this.totalPending, + 'total.waiters': this.totalWaiters, + amount: amount, + min: min, + }); + if (amount > 0) { this._pending += amount; promises.push( new Promise((_, reject) => { this._pending -= amount; + span.addEvent('creating a new session'); this._createSessions(amount).catch(reject); }) ); @@ -990,6 +1044,10 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @returns {Promise} */ async _ping(session: Session): Promise { + // NOTE: Please do not trace Ping as it gets quite spammy + // with many root spans polluting the main span. + // Please see https://github.com/googleapis/google-cloud-go/issues/1691 + this._borrow(session); if (!this._isValidSession(session)) { @@ -1030,10 +1088,16 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @param {object} options The transaction options. */ _prepareTransaction(session: Session): void { + const span = getActiveOrNoopSpan(); + span.addEvent('creating transaction for session'); const transaction = session.transaction( (session.parent as Database).queryOptions_ ); session.txn = transaction; + span.addEvent('created transaction for session', { + 'session.id': session.id.toString(), + 'transaction.id': transaction.id?.toString(), + }); } /** @@ -1048,6 +1112,9 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @param {Session} session The session object. */ _release(session: Session): void { + const span = getActiveOrNoopSpan(); + span.addEvent('releasing session back to the pool'); + this._inventory.sessions.push(session); this._inventory.borrowed.delete(session); this._traces.delete(session.id); @@ -1056,6 +1123,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { this.emit('session-available'); this.emit('readonly-available'); this.emit('readwrite-available'); + span.addEvent('released session back to the pool'); } /** diff --git a/src/session.ts b/src/session.ts index 256d8af14..67eb13446 100644 --- a/src/session.ts +++ b/src/session.ts @@ -44,6 +44,7 @@ import { import {grpc, CallOptions} from 'google-gax'; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Spanner} from '.'; +import {startTrace, setSpanError} from './instrument'; export type GetSessionResponse = [Session, r.Response]; @@ -236,25 +237,33 @@ export class Session extends common.GrpcServiceObject { optionsOrCallback: CreateSessionOptions | CreateSessionCallback, callback: CreateSessionCallback ) => { - const options = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - callback = - typeof optionsOrCallback === 'function' - ? optionsOrCallback - : callback; + return startTrace('Session.create', {}, span => { + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + callback = + typeof optionsOrCallback === 'function' + ? optionsOrCallback + : callback; - this.labels = options.labels || null; - this.databaseRole = - options.databaseRole || database.databaseRole || null; + this.labels = options.labels || null; + this.databaseRole = + options.databaseRole || database.databaseRole || null; - return database.createSession(options, (err, session, apiResponse) => { - if (err) { - callback(err, null, apiResponse); - return; - } + return database.createSession( + options, + (err, session, apiResponse) => { + if (err) { + setSpanError(span, err); + span.end(); + callback(err, null, apiResponse); + return; + } - extend(this, session); - callback(null, this, apiResponse); + extend(this, session); + span.end(); + callback(null, this, apiResponse); + } + ); }); }, } as {} as ServiceObjectConfig); @@ -375,36 +384,43 @@ export class Session extends common.GrpcServiceObject { optionsOrCallback?: CallOptions | GetSessionMetadataCallback, cb?: GetSessionMetadataCallback ): void | Promise { - const gaxOpts = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + return startTrace('Session.getMetadata', {}, span => { + const gaxOpts = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const reqOpts = { - name: this.formattedName_, - }; + const reqOpts = { + name: this.formattedName_, + }; - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } - return this.request( - { - client: 'SpannerClient', - method: 'getSession', - reqOpts, - gaxOpts, - headers: headers, - }, - (err, resp) => { - if (resp) { - resp.databaseRole = resp.creatorRole; - delete resp.creatorRole; - this.metadata = resp; - } - callback!(err, resp); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); } - ); + return this.request( + { + client: 'SpannerClient', + method: 'getSession', + reqOpts, + gaxOpts, + headers: headers, + }, + (err, resp) => { + if (err) { + setSpanError(span, err); + } + + if (resp) { + resp.databaseRole = resp.creatorRole; + delete resp.creatorRole; + this.metadata = resp; + } + span.end(); + callback!(err, resp); + } + ); + }); } /** * Ping the session with `SELECT 1` to prevent it from expiring. @@ -431,25 +447,33 @@ export class Session extends common.GrpcServiceObject { optionsOrCallback?: CallOptions | KeepAliveCallback, cb?: KeepAliveCallback ): void | Promise { - const gaxOpts = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + return startTrace('Session.keepAlive', {}, span => { + const gaxOpts = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const reqOpts = { - session: this.formattedName_, - sql: 'SELECT 1', - }; - return this.request( - { - client: 'SpannerClient', - method: 'executeSql', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - callback! - ); + const reqOpts = { + session: this.formattedName_, + sql: 'SELECT 1', + }; + return this.request( + { + client: 'SpannerClient', + method: 'executeSql', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + err => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err); + } + ); + }); } /** * Create a PartitionedDml transaction. diff --git a/src/table.ts b/src/table.ts index 343d4364b..822293245 100644 --- a/src/table.ts +++ b/src/table.ts @@ -31,6 +31,7 @@ import { import {google as databaseAdmin} from '../protos/protos'; import {Schema, LongRunningCallback} from './common'; import IRequestOptions = databaseAdmin.spanner.v1.IRequestOptions; +import {startTrace, setSpanError} from './instrument'; export type Key = string | string[]; @@ -1072,29 +1073,34 @@ class Table { options: MutateRowsOptions | CallOptions = {}, callback: CommitCallback ): void { - const requestOptions = - 'requestOptions' in options ? options.requestOptions : {}; + startTrace('Table.' + method, {}, span => { + const requestOptions = + 'requestOptions' in options ? options.requestOptions : {}; - const excludeTxnFromChangeStreams = - 'excludeTxnFromChangeStreams' in options - ? options.excludeTxnFromChangeStreams - : false; + const excludeTxnFromChangeStreams = + 'excludeTxnFromChangeStreams' in options + ? options.excludeTxnFromChangeStreams + : false; - this.database.runTransaction( - { - requestOptions: requestOptions, - excludeTxnFromChangeStreams: excludeTxnFromChangeStreams, - }, - (err, transaction) => { - if (err) { - callback(err); - return; - } + this.database.runTransaction( + { + requestOptions: requestOptions, + excludeTxnFromChangeStreams: excludeTxnFromChangeStreams, + }, + (err, transaction) => { + if (err) { + setSpanError(span, err); + span.end(); + callback(err); + return; + } - transaction![method](this.name, rows as Key[]); - transaction!.commit(options, callback); - } - ); + span.end(); + transaction![method](this.name, rows as Key[]); + transaction!.commit(options, callback); + } + ); + }); } } diff --git a/src/transaction.ts b/src/transaction.ts index 5617b4a3c..055e3bfc6 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -22,7 +22,7 @@ import {EventEmitter} from 'events'; import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax'; import * as is from 'is'; import {common as p} from 'protobufjs'; -import {Readable, PassThrough} from 'stream'; +import {finished, Readable, PassThrough, Stream} from 'stream'; import {codec, Json, JSONOptions, Type, Value} from './codec'; import { @@ -45,6 +45,7 @@ import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Database, Spanner} from '.'; import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; +import {startTrace, setSpanError} from './instrument'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -401,55 +402,60 @@ export class Snapshot extends EventEmitter { gaxOptionsOrCallback?: CallOptions | BeginTransactionCallback, cb?: BeginTransactionCallback ): void | Promise { - const gaxOpts = - typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; - const callback = - typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; - - const session = this.session.formattedName_!; - const options = this._options; - const reqOpts: spannerClient.spanner.v1.IBeginTransactionRequest = { - session, - options, - }; - - // Only hand crafted read-write transactions will be able to set a - // transaction tag for the BeginTransaction RPC. Also, this.requestOptions - // is only set in the constructor of Transaction, which is the constructor - // for read/write transactions. - if (this.requestOptions) { - reqOpts.requestOptions = this.requestOptions; - } + return startTrace('Transaction.begin', {}, span => { + const gaxOpts = + typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; + const callback = + typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; + + const session = this.session.formattedName_!; + const options = this._options; + const reqOpts: spannerClient.spanner.v1.IBeginTransactionRequest = { + session, + options, + }; + + // Only hand crafted read-write transactions will be able to set a + // transaction tag for the BeginTransaction RPC. Also, this.requestOptions + // is only set in the constructor of Transaction, which is the constructor + // for read/write transactions. + if (this.requestOptions) { + reqOpts.requestOptions = this.requestOptions; + } - const headers = this.resourceHeader_; - if ( - this._getSpanner().routeToLeaderEnabled && - (this._options.readWrite !== undefined || - this._options.partitionedDml !== undefined) - ) { - addLeaderAwareRoutingHeader(headers); - } + const headers = this.resourceHeader_; + if ( + this._getSpanner().routeToLeaderEnabled && + (this._options.readWrite !== undefined || + this._options.partitionedDml !== undefined) + ) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'beginTransaction', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ITransaction - ) => { - if (err) { - callback!(err, resp); - return; + this.request( + { + client: 'SpannerClient', + method: 'beginTransaction', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ITransaction + ) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, resp); + return; + } + this._update(resp); + span.end(); + callback!(null, resp); } - this._update(resp); - callback!(null, resp); - } - ); + ); + }); } /** @@ -898,23 +904,31 @@ export class Snapshot extends EventEmitter { requestOrCallback: ReadRequest | ReadCallback, cb?: ReadCallback ): void | Promise { - const rows: Rows = []; + return startTrace('Transaction.read', {tableName: table}, span => { + const rows: Rows = []; - let request: ReadRequest; - let callback: ReadCallback; + let request: ReadRequest; + let callback: ReadCallback; - if (typeof requestOrCallback === 'function') { - request = {} as RequestOptions; - callback = requestOrCallback as ReadCallback; - } else { - request = requestOrCallback as RequestOptions; - callback = cb as ReadCallback; - } + if (typeof requestOrCallback === 'function') { + request = {} as RequestOptions; + callback = requestOrCallback as ReadCallback; + } else { + request = requestOrCallback as RequestOptions; + callback = cb as ReadCallback; + } - this.createReadStream(table, request) - .on('error', callback!) - .on('data', row => rows.push(row)) - .on('end', () => callback!(null, rows)); + this.createReadStream(table, request) + .on('error', err => { + setSpanError(span, err); + callback!(err as grpc.ServiceError, null); + }) + .on('data', row => rows.push(row)) + .on('end', () => { + span.end(); + callback!(null, rows); + }); + }); } /** @@ -1000,23 +1014,32 @@ export class Snapshot extends EventEmitter { query: string | ExecuteSqlRequest, callback?: RunCallback ): void | Promise { - const rows: Rows = []; - let stats: google.spanner.v1.ResultSetStats; - let metadata: google.spanner.v1.ResultSetMetadata; - - this.runStream(query) - .on('error', callback!) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - if (metadata.transaction && !this.id) { - this._update(metadata.transaction); + return startTrace('Transaction.run', {}, span => { + const rows: Rows = []; + let stats: google.spanner.v1.ResultSetStats; + let metadata: google.spanner.v1.ResultSetMetadata; + + this.runStream(query) + .on('error', (err, rows, stats, metadata) => { + setSpanError(span, err); + span.end(); + callback!(err, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + if (metadata.transaction && !this.id) { + this._update(metadata.transaction); + } } - } - }) - .on('data', row => rows.push(row)) - .on('stats', _stats => (stats = _stats)) - .on('end', () => callback!(null, rows, stats, metadata)); + }) + .on('data', row => rows.push(row)) + .on('stats', _stats => (stats = _stats)) + .on('end', () => { + span.end(); + callback!(null, rows, stats, metadata); + }); + }); } /** @@ -1119,111 +1142,126 @@ export class Snapshot extends EventEmitter { * ``` */ runStream(query: string | ExecuteSqlRequest): PartialResultStream { - if (typeof query === 'string') { - query = {sql: query} as ExecuteSqlRequest; - } - - query = Object.assign({}, query) as ExecuteSqlRequest; - query.queryOptions = Object.assign( - Object.assign({}, this.queryOptions), - query.queryOptions - ); - - const { - gaxOptions, - json, - jsonOptions, - maxResumeRetries, - requestOptions, - columnsMetadata, - } = query; - let reqOpts; - - const directedReadOptions = this._getDirectedReadOptions( - query.directedReadOptions - ); - - const sanitizeRequest = () => { - query = query as ExecuteSqlRequest; - const {params, paramTypes} = Snapshot.encodeParams(query); - const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; - if (this.id) { - transaction.id = this.id as Uint8Array; - } else if (this._options.readWrite) { - transaction.begin = this._options; - } else { - transaction.singleUse = this._options; + if (typeof query === 'string') { + query = {sql: query} as ExecuteSqlRequest; } - delete query.gaxOptions; - delete query.json; - delete query.jsonOptions; - delete query.maxResumeRetries; - delete query.requestOptions; - delete query.types; - delete query.directedReadOptions; - delete query.columnsMetadata; - - reqOpts = Object.assign(query, { - session: this.session.formattedName_!, - seqno: this._seqno++, - requestOptions: this.configureTagOptions( - typeof transaction.singleUse !== 'undefined', - this.requestOptions?.transactionTag ?? undefined, - requestOptions - ), - directedReadOptions: directedReadOptions, - transaction, - params, - paramTypes, - }); - }; - const headers = this.resourceHeader_; - if ( - this._getSpanner().routeToLeaderEnabled && - (this._options.readWrite !== undefined || - this._options.partitionedDml !== undefined) - ) { - addLeaderAwareRoutingHeader(headers); - } + return startTrace('Transaction.runStream', query, span => { + query = Object.assign({}, query) as ExecuteSqlRequest; + query.queryOptions = Object.assign( + Object.assign({}, this.queryOptions), + query.queryOptions + ); - const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (!reqOpts || (this.id && !reqOpts.transaction.id)) { - try { - sanitizeRequest(); - } catch (e) { - const errorStream = new PassThrough(); - setImmediate(() => errorStream.destroy(e as Error)); - return errorStream; + const { + gaxOptions, + json, + jsonOptions, + maxResumeRetries, + requestOptions, + columnsMetadata, + } = query; + let reqOpts; + + const directedReadOptions = this._getDirectedReadOptions( + query.directedReadOptions + ); + + const sanitizeRequest = () => { + query = query as ExecuteSqlRequest; + const {params, paramTypes} = Snapshot.encodeParams(query); + const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; + if (this.id) { + transaction.id = this.id as Uint8Array; + } else if (this._options.readWrite) { + transaction.begin = this._options; + } else { + transaction.singleUse = this._options; } + delete query.gaxOptions; + delete query.json; + delete query.jsonOptions; + delete query.maxResumeRetries; + delete query.requestOptions; + delete query.types; + delete query.directedReadOptions; + delete query.columnsMetadata; + + reqOpts = Object.assign(query, { + session: this.session.formattedName_!, + seqno: this._seqno++, + requestOptions: this.configureTagOptions( + typeof transaction.singleUse !== 'undefined', + this.requestOptions?.transactionTag ?? undefined, + requestOptions + ), + directedReadOptions: directedReadOptions, + transaction, + params, + paramTypes, + }); + }; + + const headers = this.resourceHeader_; + if ( + this._getSpanner().routeToLeaderEnabled && + (this._options.readWrite !== undefined || + this._options.partitionedDml !== undefined) + ) { + addLeaderAwareRoutingHeader(headers); } - return this.requestStream({ - client: 'SpannerClient', - method: 'executeStreamingSql', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), - gaxOpts: gaxOptions, - headers: headers, - }); - }; - - return partialResultStream(this._wrapWithIdWaiter(makeRequest), { - json, - jsonOptions, - maxResumeRetries, - columnsMetadata, - gaxOptions, - }) - .on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); + const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (!reqOpts || (this.id && !reqOpts.transaction.id)) { + try { + sanitizeRequest(); + } catch (e) { + const errorStream = new PassThrough(); + setSpanError(span, e as Error); + span.recordException(e as Error); + setImmediate(() => errorStream.destroy(e as Error)); + return errorStream; + } } + + return this.requestStream({ + client: 'SpannerClient', + method: 'executeStreamingSql', + reqOpts: Object.assign({}, reqOpts, {resumeToken}), + gaxOpts: gaxOptions, + headers: headers, + }); + }; + + const prs = partialResultStream(this._wrapWithIdWaiter(makeRequest), { + json, + jsonOptions, + maxResumeRetries, + columnsMetadata, + gaxOptions, }) - .on('error', () => { - if (!this.id && this._useInRunner) { - this.begin(); - } - }); + ?.on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', () => { + if (!this.id && this._useInRunner) { + this.begin(); + } + }); + + if (prs instanceof Stream) { + finished(prs, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + } + + return prs; + }); } /** @@ -1513,26 +1551,33 @@ export class Dml extends Snapshot { query: string | ExecuteSqlRequest, callback?: RunUpdateCallback ): void | Promise { - if (typeof query === 'string') { - query = {sql: query} as ExecuteSqlRequest; - } + return startTrace('Transaction.runUpdate', {}, span => { + if (typeof query === 'string') { + query = {sql: query} as ExecuteSqlRequest; + } - this.run( - query, - ( - err: null | grpc.ServiceError, - rows: Rows, - stats: spannerClient.spanner.v1.ResultSetStats - ) => { - let rowCount = 0; - - if (stats && stats.rowCount) { - rowCount = Math.floor(stats[stats.rowCount] as number); - } + this.run( + query, + ( + err: null | grpc.ServiceError, + rows: Rows, + stats: spannerClient.spanner.v1.ResultSetStats + ) => { + let rowCount = 0; + + if (stats && stats.rowCount) { + rowCount = Math.floor(stats[stats.rowCount] as number); + } - callback!(err, rowCount); - } - ); + if (err) { + setSpanError(span, err); + } + + span.end(); + callback!(err, rowCount); + } + ); + }); } } @@ -1733,110 +1778,120 @@ export class Transaction extends Dml { optionsOrCallback?: BatchUpdateOptions | CallOptions | BatchUpdateCallback, cb?: BatchUpdateCallback ): Promise | void { - const options = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const gaxOpts = - 'gaxOptions' in options - ? (options as BatchUpdateOptions).gaxOptions - : options; - - if (!Array.isArray(queries) || !queries.length) { - const rowCounts: number[] = []; - const error = new Error('batchUpdate requires at least 1 DML statement.'); - const batchError: BatchUpdateError = Object.assign(error, { - code: 3, // invalid argument - rowCounts, - }) as BatchUpdateError; - callback!(batchError, rowCounts); - return; - } + return startTrace('Transaction.batchUpdate', {}, span => { + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const gaxOpts = + 'gaxOptions' in options + ? (options as BatchUpdateOptions).gaxOptions + : options; + + if (!Array.isArray(queries) || !queries.length) { + const rowCounts: number[] = []; + const error = new Error( + 'batchUpdate requires at least 1 DML statement.' + ); + const batchError: BatchUpdateError = Object.assign(error, { + code: 3, // invalid argument + rowCounts, + }) as BatchUpdateError; + setSpanError(span, batchError); + span.end(); + callback!(batchError, rowCounts); + return; + } - const statements: spannerClient.spanner.v1.ExecuteBatchDmlRequest.IStatement[] = - queries.map(query => { - if (typeof query === 'string') { - return {sql: query}; - } - const {sql} = query; - const {params, paramTypes} = Snapshot.encodeParams(query); - return {sql, params, paramTypes}; - }); + const statements: spannerClient.spanner.v1.ExecuteBatchDmlRequest.IStatement[] = + queries.map(query => { + if (typeof query === 'string') { + return {sql: query}; + } + const {sql} = query; + const {params, paramTypes} = Snapshot.encodeParams(query); + return {sql, params, paramTypes}; + }); - const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; - if (this.id) { - transaction.id = this.id as Uint8Array; - } else { - transaction.begin = this._options; - } - const reqOpts: spannerClient.spanner.v1.ExecuteBatchDmlRequest = { - session: this.session.formattedName_!, - requestOptions: this.configureTagOptions( - false, - this.requestOptions?.transactionTag ?? undefined, - (options as BatchUpdateOptions).requestOptions - ), - transaction, - seqno: this._seqno++, - statements, - } as spannerClient.spanner.v1.ExecuteBatchDmlRequest; + const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; + if (this.id) { + transaction.id = this.id as Uint8Array; + } else { + transaction.begin = this._options; + } + const reqOpts: spannerClient.spanner.v1.ExecuteBatchDmlRequest = { + session: this.session.formattedName_!, + requestOptions: this.configureTagOptions( + false, + this.requestOptions?.transactionTag ?? undefined, + (options as BatchUpdateOptions).requestOptions + ), + transaction, + seqno: this._seqno++, + statements, + } as spannerClient.spanner.v1.ExecuteBatchDmlRequest; - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'executeBatchDml', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse - ) => { - let batchUpdateError: BatchUpdateError; - - if (err) { - const rowCounts: number[] = []; - batchUpdateError = Object.assign(err, {rowCounts}); - callback!(batchUpdateError, rowCounts, resp); - return; - } + this.request( + { + client: 'SpannerClient', + method: 'executeBatchDml', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse + ) => { + let batchUpdateError: BatchUpdateError; + + if (err) { + const rowCounts: number[] = []; + batchUpdateError = Object.assign(err, {rowCounts}); + setSpanError(span, batchUpdateError); + span.end(); + callback!(batchUpdateError, rowCounts, resp); + return; + } - const {resultSets, status} = resp; - for (const resultSet of resultSets) { - if (!this.id && resultSet.metadata?.transaction) { - this._update(resultSet.metadata.transaction); + const {resultSets, status} = resp; + for (const resultSet of resultSets) { + if (!this.id && resultSet.metadata?.transaction) { + this._update(resultSet.metadata.transaction); + } + } + const rowCounts: number[] = resultSets.map(({stats}) => { + return ( + (stats && + Number( + stats[ + (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! + ] + )) || + 0 + ); + }); + + if (status && status.code !== 0) { + const error = new Error(status.message!); + batchUpdateError = Object.assign(error, { + code: status.code, + metadata: Transaction.extractKnownMetadata(status.details!), + rowCounts, + }) as BatchUpdateError; + setSpanError(span, batchUpdateError); } - } - const rowCounts: number[] = resultSets.map(({stats}) => { - return ( - (stats && - Number( - stats[ - (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! - ] - )) || - 0 - ); - }); - if (status && status.code !== 0) { - const error = new Error(status.message!); - batchUpdateError = Object.assign(error, { - code: status.code, - metadata: Transaction.extractKnownMetadata(status.details!), - rowCounts, - }) as BatchUpdateError; + span.end(); + callback!(batchUpdateError!, rowCounts, resp); } - - callback!(batchUpdateError!, rowCounts, resp); - } - ); + ); + }); } private static extractKnownMetadata( @@ -1937,71 +1992,85 @@ export class Transaction extends Dml { optionsOrCallback?: CommitOptions | CallOptions | CommitCallback, cb?: CommitCallback ): void | Promise { - const options = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const gaxOpts = - 'gaxOptions' in options ? (options as CommitOptions).gaxOptions : options; - - const mutations = this._queuedMutations; - const session = this.session.formattedName_!; - const requestOptions = (options as CommitOptions).requestOptions; - const reqOpts: CommitRequest = {mutations, session, requestOptions}; + return startTrace('Transaction.commit', {}, span => { + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const gaxOpts = + 'gaxOptions' in options + ? (options as CommitOptions).gaxOptions + : options; + + const mutations = this._queuedMutations; + const session = this.session.formattedName_!; + const requestOptions = (options as CommitOptions).requestOptions; + const reqOpts: CommitRequest = {mutations, session, requestOptions}; - if (this.id) { - reqOpts.transactionId = this.id as Uint8Array; - } else if (!this._useInRunner) { - reqOpts.singleUseTransaction = this._options; - } else { - this.begin().then(() => this.commit(options, callback), callback); - return; - } + if (this.id) { + reqOpts.transactionId = this.id as Uint8Array; + } else if (!this._useInRunner) { + reqOpts.singleUseTransaction = this._options; + } else { + this.begin().then(() => { + span.end(); + this.commit(options, callback); + }, callback); + return; + } - if ( - 'returnCommitStats' in options && - (options as CommitOptions).returnCommitStats - ) { - reqOpts.returnCommitStats = (options as CommitOptions).returnCommitStats; - } - if ( - 'maxCommitDelay' in options && - (options as CommitOptions).maxCommitDelay - ) { - reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; - } - reqOpts.requestOptions = Object.assign( - requestOptions || {}, - this.requestOptions - ); + if ( + 'returnCommitStats' in options && + (options as CommitOptions).returnCommitStats + ) { + reqOpts.returnCommitStats = ( + options as CommitOptions + ).returnCommitStats; + } + if ( + 'maxCommitDelay' in options && + (options as CommitOptions).maxCommitDelay + ) { + reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; + } + reqOpts.requestOptions = Object.assign( + requestOptions || {}, + this.requestOptions + ); - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'commit', - reqOpts, - gaxOpts: gaxOpts, - headers: headers, - }, - (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { - this.end(); - - if (resp && resp.commitTimestamp) { - this.commitTimestampProto = resp.commitTimestamp; - this.commitTimestamp = new PreciseDate( - resp.commitTimestamp as DateStruct - ); - } - err = Transaction.decorateCommitError(err as ServiceError, mutations); + this.request( + { + client: 'SpannerClient', + method: 'commit', + reqOpts, + gaxOpts: gaxOpts, + headers: headers, + }, + (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { + this.end(); + + if (err) { + setSpanError(span, err); + } - callback!(err as ServiceError | null, resp); - } - ); + if (resp && resp.commitTimestamp) { + this.commitTimestampProto = resp.commitTimestamp; + this.commitTimestamp = new PreciseDate( + resp.commitTimestamp as DateStruct + ); + } + err = Transaction.decorateCommitError(err as ServiceError, mutations); + + span.end(); + callback!(err as ServiceError | null, resp); + } + ); + }); } /** @@ -2287,45 +2356,52 @@ export class Transaction extends Dml { | spannerClient.spanner.v1.Spanner.RollbackCallback, cb?: spannerClient.spanner.v1.Spanner.RollbackCallback ): void | Promise { - const gaxOpts = - typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; - const callback = - typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; - - if (!this.id) { - callback!( - new Error( + return startTrace('Transaction.rollback', {}, span => { + const gaxOpts = + typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; + const callback = + typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; + + if (!this.id) { + const err = new Error( 'Transaction ID is unknown, nothing to rollback.' - ) as ServiceError - ); - return; - } + ) as ServiceError; + setSpanError(span, err); + span.end(); + callback!(err); + return; + } - const session = this.session.formattedName_!; - const transactionId = this.id; - const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { - session, - transactionId, - }; + const session = this.session.formattedName_!; + const transactionId = this.id; + const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { + session, + transactionId, + }; - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } - - this.request( - { - client: 'SpannerClient', - method: 'rollback', - reqOpts, - gaxOpts, - headers: headers, - }, - (err: null | ServiceError) => { - this.end(); - callback!(err); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); } - ); + + this.request( + { + client: 'SpannerClient', + method: 'rollback', + reqOpts, + gaxOpts, + headers: headers, + }, + (err: null | ServiceError) => { + this.end(); + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err); + } + ); + }); } /** diff --git a/test/session-pool.ts b/test/session-pool.ts index 43599585d..292055d45 100644 --- a/test/session-pool.ts +++ b/test/session-pool.ts @@ -30,6 +30,23 @@ import * as sp from '../src/session-pool'; import {Transaction} from '../src/transaction'; import {grpc} from 'google-gax'; +const {ContextManager} = require('@opentelemetry/api'); +const { + AsyncHooksContextManager, +} = require('@opentelemetry/context-async-hooks'); +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +const { + disableContextAndManager, + setGlobalContextManager, + getTracer, + setTracerProvider, +} = require('../src/instrument'); + let pQueueOverride: typeof PQueue | null = null; function FakePQueue(options) { @@ -1349,6 +1366,82 @@ describe('SessionPool', () => { }); }); }); + + describe('observability annotations on active span', () => { + const projectId = process.env.SPANNER_TEST_PROJECTID || 'test-project'; + const exporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + let provider: typeof NodeTracerProvider; + // let contextManager: typeof ContextManager; + const contextManager = new AsyncHooksContextManager(); + setGlobalContextManager(contextManager); + + beforeEach(() => { + provider = new NodeTracerProvider({ + sampler: sampler, + exporter: exporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); + provider.register(); + setTracerProvider(provider); + + sessionPool.isOpen = true; + sessionPool._isValidSession = () => true; + }); + + afterEach(async () => { + exporter.forceFlush(); + exporter.reset(); + }); + + after(async () => { + disableContextAndManager(contextManager); + await provider.shutdown(); + }); + + it('acquire', () => { + getTracer().startActiveSpan('testSessionPool.acquire', async span => { + const fakeSession = createSession(); + const now = Date.now(); + + const stub = sandbox + .stub(sessionPool, '_getSession') + .resolves(fakeSession); + const session = await sessionPool._acquire(); + const [startTime] = stub.getCall(0).args; + + assert(isAround(startTime, now)); + assert.strictEqual(session, fakeSession); + + // TODO: Investigate why the context at this + // point is NOT the same context as was used in + // the "await sessionPool._acquire() call. + await sessionPool._release(session); + span.end(); + + const spans = exporter.getFinishedSpans(); + assert.strictEqual( + spans.length, + 1, + 'exactly 1 span should have been exported' + ); + const span0 = spans[0]; + assert.strictEqual(!span0.events, false, 'events must be set'); + assert.strictEqual(span0.events.length > 0, true, 'events must be set'); + const events = span0.events; + + // Sort the events by earliest time of occurence. + events.sort((evtA, evtB) => { + return evtA.time < evtB.time; + }); + + // Now check to see that we at least acquired a valid session. + const event0 = events[0]; + assert.strictEqual(event0.name, 'acquired a valid session'); + }); + }); + }); }); function isAround(actual, expected) { diff --git a/tsconfig.json b/tsconfig.json index 3824d3ff5..a72e8f089 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -19,6 +19,7 @@ "test/*.ts", "test/**/*.ts", "system-test/*.ts", + "observability-test/*.ts", "benchmark/*.ts" ] }