From afa587386dbafa81fb177634e36f49a6093b9f37 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sun, 22 Sep 2024 02:34:45 -1000 Subject: [PATCH] feat(observability): trace Transaction This change adds observability tracing for Transaction along with tests. Updates #2079 Built from PR #2087 Updates #2114 --- observability-test/transaction.ts | 192 +++++++++ src/transaction.ts | 676 +++++++++++++++++------------- 2 files changed, 582 insertions(+), 286 deletions(-) create mode 100644 observability-test/transaction.ts diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts new file mode 100644 index 000000000..fa4234904 --- /dev/null +++ b/observability-test/transaction.ts @@ -0,0 +1,192 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {grpc} from 'google-gax'; +import {google} from '../protos/protos'; +import {Database, Spanner, Transaction} from '../src'; +import protobuf = google.spanner.v1; +import * as mock from '../test/mockserver/mockspanner'; +import * as mockInstanceAdmin from '../test/mockserver/mockinstanceadmin'; +import * as mockDatabaseAdmin from '../test/mockserver/mockdatabaseadmin'; +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + +/** A simple result set for SELECT 1. */ +function createSelect1ResultSet(): protobuf.ResultSet { + const fields = [ + protobuf.StructType.Field.create({ + name: 'NUM', + type: protobuf.Type.create({code: protobuf.TypeCode.INT64}), + }), + ]; + const metadata = new protobuf.ResultSetMetadata({ + rowType: new protobuf.StructType({ + fields, + }), + }); + return protobuf.ResultSet.create({ + metadata, + rows: [{values: [{stringValue: '1'}]}], + }); +} + +interface setupResults { + server: grpc.Server; + spanner: Spanner; + spannerMock: mock.MockSpanner; +} + +const selectSql = 'SELECT 1'; +const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + +async function setup(): Promise { + const server = new grpc.Server(); + + const spannerMock = mock.createMockSpanner(server); + mockInstanceAdmin.createMockInstanceAdmin(server); + mockDatabaseAdmin.createMockDatabaseAdmin(server); + + const port: number = await new Promise((resolve, reject) => { + server.bindAsync( + '0.0.0.0:0', + grpc.ServerCredentials.createInsecure(), + (err, assignedPort) => { + if (err) { + reject(err); + } else { + resolve(assignedPort); + } + } + ); + }); + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + + const spanner = new Spanner({ + projectId: 'observability-project-id', + servicePath: 'localhost', + port, + sslCreds: grpc.credentials.createInsecure(), + }); + + return Promise.resolve({ + spanner: spanner, + server: server, + spannerMock: spannerMock, + }); +} + +describe('Transaction', () => { + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + + after(() => { + spanner.close(); + server.tryShutdown(() => {}); + }); + + before(async () => { + const setupResult = await setup(); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; + + const selectSql = 'SELECT 1'; + const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + const upsertSql = 'INSERTORUPDATE INTO FOO(BAR, BAZ) VALUES(@bar, @baz)'; + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + spannerMock.putStatementResult( + upsertSql, + mock.StatementResult.updateCount(1) + ); + + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const instance = spanner.instance('instance'); + database = instance.database('database'); + database.observabilityConfig = { + tracerProvider: provider, + enableExtendedTracing: false, + }; + }); + + beforeEach(() => { + spannerMock.resetRequests(); + }); + + afterEach(() => { + traceExporter.reset(); + }); + + it('run', done => { + database.getTransaction((err, tx) => { + assert.ifError(err); + + tx!.run('SELECT 1', (err, rows) => { + // Reset the trace exporter to clear any prior spans + // so that we only have spans for Transaction. + traceExporter.forceFlush(); + traceExporter.reset(); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Transaction.upsert']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + }); +}); diff --git a/src/transaction.ts b/src/transaction.ts index 1a375ff0f..1c3a50aeb 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -22,7 +22,7 @@ import {EventEmitter} from 'events'; import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax'; import * as is from 'is'; import {common as p} from 'protobufjs'; -import {Readable, PassThrough} from 'stream'; +import {finished, Readable, PassThrough, Stream} from 'stream'; import {codec, Json, JSONOptions, Type, Value} from './codec'; import { @@ -46,7 +46,12 @@ import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Database, Spanner} from '.'; import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; -import {ObservabilityOptions} from './instrument'; +import { + ObservabilityOptions, + startTrace, + setSpanError, + setSpanErrorAndException, +} from './instrument'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -286,7 +291,7 @@ export class Snapshot extends EventEmitter { queryOptions?: IQueryOptions; resourceHeader_: {[k: string]: string}; requestOptions?: Pick; - observabilityOptions?: ObservabilityOptions; + observabilityOptions?: ObservabilityOptions | undefined; /** * The transaction ID. @@ -351,6 +356,7 @@ export class Snapshot extends EventEmitter { }; this._waitingRequests = []; this._inlineBeginStarted = false; + this.observabilityOptions = session.observabilityConfig; } /** @@ -416,9 +422,6 @@ export class Snapshot extends EventEmitter { options, }; - const span = getActiveOrNoopSpan(); - span.addEvent('Begin Transaction'); - // Only hand crafted read-write transactions will be able to set a // transaction tag for the BeginTransaction RPC. Also, this.requestOptions // is only set in the constructor of Transaction, which is the constructor @@ -436,26 +439,34 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'beginTransaction', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ITransaction - ) => { - if (err) { - callback!(err, resp); - return; + const q = {opts: this.observabilityOptions}; + return startTrace('Snapshot.begin', q, span => { + span.addEvent('Begin Transaction'); + + this.request( + { + client: 'SpannerClient', + method: 'beginTransaction', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ITransaction + ) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, resp); + return; + } + this._update(resp); + span.end(); + callback!(null, resp); } - this._update(resp); - callback!(null, resp); - } - ); + ); + }); } /** @@ -692,45 +703,59 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (this.id && transaction.begin) { - delete transaction.begin; - transaction.id = this.id; - } - return this.requestStream({ - client: 'SpannerClient', - method: 'streamingRead', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), - gaxOpts: gaxOptions, - headers: headers, - }); - }; - - return partialResultStream(this._wrapWithIdWaiter(makeRequest), { - json, - jsonOptions, - maxResumeRetries, - columnsMetadata, - gaxOptions, - }) - ?.on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); + const q = {tableName: table, opts: this.observabilityOptions}; + return startTrace('Snapshot.createReadStream', q, span => { + const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (this.id && transaction.begin) { + delete transaction.begin; + transaction.id = this.id; } + return this.requestStream({ + client: 'SpannerClient', + method: 'streamingRead', + reqOpts: Object.assign({}, reqOpts, {resumeToken}), + gaxOpts: gaxOptions, + headers: headers, + }); + }; + + const prs = partialResultStream(this._wrapWithIdWaiter(makeRequest), { + json, + jsonOptions, + maxResumeRetries, + columnsMetadata, + gaxOptions, }) - .on('error', err => { - const isServiceError = err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); + ?.on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', err => { + const isServiceError = + err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { + this.begin(); + } + setSpanError(span, err); + }); + + finished(prs, err => { + if (err) { + setSpanError(span, err); } + span.end(); }); + + return prs; + }); } /** @@ -925,10 +950,19 @@ export class Snapshot extends EventEmitter { callback = cb as ReadCallback; } - this.createReadStream(table, request) - .on('error', callback!) - .on('data', row => rows.push(row)) - .on('end', () => callback!(null, rows)); + const q = {tableName: table, opts: this.observabilityOptions}; + return startTrace('Snapshot.read', q, span => { + this.createReadStream(table, request) + .on('error', err => { + setSpanError(span, err); + callback!(err as grpc.ServiceError, null); + }) + .on('data', row => rows.push(row)) + .on('end', () => { + span.end(); + callback!(null, rows); + }); + }); } /** @@ -1018,19 +1052,29 @@ export class Snapshot extends EventEmitter { let stats: google.spanner.v1.ResultSetStats; let metadata: google.spanner.v1.ResultSetMetadata; - this.runStream(query) - .on('error', callback!) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - if (metadata.transaction && !this.id) { - this._update(metadata.transaction); + const q = {sql: query, opts: this.observabilityOptions}; + return startTrace('Snapshot.run', q, span => { + this.runStream(query) + .on('error', (err, rows, stats, metadata) => { + setSpanError(span, err); + span.end(); + callback!(err, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + if (metadata.transaction && !this.id) { + this._update(metadata.transaction); + } } - } - }) - .on('data', row => rows.push(row)) - .on('stats', _stats => (stats = _stats)) - .on('end', () => callback!(null, rows, stats, metadata)); + }) + .on('data', row => rows.push(row)) + .on('stats', _stats => (stats = _stats)) + .on('end', () => { + span.end(); + callback!(null, rows, stats, metadata); + }); + }); } /** @@ -1201,51 +1245,70 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (!reqOpts || (this.id && !reqOpts.transaction.id)) { - try { - sanitizeRequest(); - } catch (e) { - const errorStream = new PassThrough(); - setImmediate(() => errorStream.destroy(e as Error)); - return errorStream; + const q = {opts: this.observabilityOptions}; + Object.assign(q, query); + return startTrace('Snapshot.runStream', q, span => { + const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (!reqOpts || (this.id && !reqOpts.transaction.id)) { + try { + sanitizeRequest(); + } catch (e) { + const errorStream = new PassThrough(); + setSpanErrorAndException(span, e as Error); + span.end(); + setImmediate(() => errorStream.destroy(e as Error)); + return errorStream; + } } - } - - return this.requestStream({ - client: 'SpannerClient', - method: 'executeStreamingSql', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), - gaxOpts: gaxOptions, - headers: headers, - }); - }; - return partialResultStream(this._wrapWithIdWaiter(makeRequest), { - json, - jsonOptions, - maxResumeRetries, - columnsMetadata, - gaxOptions, - }) - .on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } + return this.requestStream({ + client: 'SpannerClient', + method: 'executeStreamingSql', + reqOpts: Object.assign({}, reqOpts, {resumeToken}), + gaxOpts: gaxOptions, + headers: headers, + }); + }; + + const prs = partialResultStream(this._wrapWithIdWaiter(makeRequest), { + json, + jsonOptions, + maxResumeRetries, + columnsMetadata, + gaxOptions, }) - .on('error', err => { - const isServiceError = err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); - } - }); + .on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', err => { + setSpanError(span, err as Error); + const isServiceError = + err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { + this.begin(); + } + }); + + if (prs instanceof Stream) { + finished(prs, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + } + + return prs; + }); } /** @@ -1543,22 +1606,31 @@ export class Dml extends Snapshot { query = {sql: query} as ExecuteSqlRequest; } - this.run( - query, - ( - err: null | grpc.ServiceError, - rows: Rows, - stats: spannerClient.spanner.v1.ResultSetStats - ) => { - let rowCount = 0; - - if (stats && stats.rowCount) { - rowCount = Math.floor(stats[stats.rowCount] as number); - } + const q = {opts: this.observabilityOptions}; + Object.assign(q, query); + return startTrace('Dml.runUpdate', q, span => { + this.run( + query, + ( + err: null | grpc.ServiceError, + rows: Rows, + stats: spannerClient.spanner.v1.ResultSetStats + ) => { + let rowCount = 0; + + if (stats && stats.rowCount) { + rowCount = Math.floor(stats[stats.rowCount] as number); + } - callback!(err, rowCount); - } - ); + if (err) { + setSpanError(span, err); + } + + span.end(); + callback!(err, rowCount); + } + ); + }); } } @@ -1812,57 +1884,64 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'executeBatchDml', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse - ) => { - let batchUpdateError: BatchUpdateError; - - if (err) { - const rowCounts: number[] = []; - batchUpdateError = Object.assign(err, {rowCounts}); - callback!(batchUpdateError, rowCounts, resp); - return; - } + const q = {opts: this.observabilityOptions}; + return startTrace('Transaction.batchUpdate', q, span => { + this.request( + { + client: 'SpannerClient', + method: 'executeBatchDml', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse + ) => { + let batchUpdateError: BatchUpdateError; + + if (err) { + const rowCounts: number[] = []; + batchUpdateError = Object.assign(err, {rowCounts}); + setSpanError(span, batchUpdateError); + span.end(); + callback!(batchUpdateError, rowCounts, resp); + return; + } - const {resultSets, status} = resp; - for (const resultSet of resultSets) { - if (!this.id && resultSet.metadata?.transaction) { - this._update(resultSet.metadata.transaction); + const {resultSets, status} = resp; + for (const resultSet of resultSets) { + if (!this.id && resultSet.metadata?.transaction) { + this._update(resultSet.metadata.transaction); + } + } + const rowCounts: number[] = resultSets.map(({stats}) => { + return ( + (stats && + Number( + stats[ + (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! + ] + )) || + 0 + ); + }); + + if (status && status.code !== 0) { + const error = new Error(status.message!); + batchUpdateError = Object.assign(error, { + code: status.code, + metadata: Transaction.extractKnownMetadata(status.details!), + rowCounts, + }) as BatchUpdateError; + setSpanError(span, batchUpdateError); } - } - const rowCounts: number[] = resultSets.map(({stats}) => { - return ( - (stats && - Number( - stats[ - (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! - ] - )) || - 0 - ); - }); - if (status && status.code !== 0) { - const error = new Error(status.message!); - batchUpdateError = Object.assign(error, { - code: status.code, - metadata: Transaction.extractKnownMetadata(status.details!), - rowCounts, - }) as BatchUpdateError; + span.end(); + callback!(batchUpdateError!, rowCounts, resp); } - - callback!(batchUpdateError!, rowCounts, resp); - } - ); + ); + }); } private static extractKnownMetadata( @@ -1963,81 +2042,91 @@ export class Transaction extends Dml { optionsOrCallback?: CommitOptions | CallOptions | CommitCallback, cb?: CommitCallback ): void | Promise { - const options = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const gaxOpts = - 'gaxOptions' in options ? (options as CommitOptions).gaxOptions : options; - - const mutations = this._queuedMutations; - const session = this.session.formattedName_!; - const requestOptions = (options as CommitOptions).requestOptions; - const reqOpts: CommitRequest = {mutations, session, requestOptions}; + const q = {opts: this.observabilityOptions}; + return startTrace('Transaction.commit', q, span => { + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const gaxOpts = + 'gaxOptions' in options + ? (options as CommitOptions).gaxOptions + : options; + + const mutations = this._queuedMutations; + const session = this.session.formattedName_!; + const requestOptions = (options as CommitOptions).requestOptions; + const reqOpts: CommitRequest = {mutations, session, requestOptions}; - const span = getActiveOrNoopSpan(); - - if (this.id) { - reqOpts.transactionId = this.id as Uint8Array; - } else if (!this._useInRunner) { - reqOpts.singleUseTransaction = this._options; - } else { - this.begin().then(() => this.commit(options, callback), callback); - return; - } - - if ( - 'returnCommitStats' in options && - (options as CommitOptions).returnCommitStats - ) { - reqOpts.returnCommitStats = (options as CommitOptions).returnCommitStats; - } - if ( - 'maxCommitDelay' in options && - (options as CommitOptions).maxCommitDelay - ) { - reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; - } - reqOpts.requestOptions = Object.assign( - requestOptions || {}, - this.requestOptions - ); + if (this.id) { + reqOpts.transactionId = this.id as Uint8Array; + } else if (!this._useInRunner) { + reqOpts.singleUseTransaction = this._options; + } else { + this.begin().then(() => { + span.end(); + this.commit(options, callback); + }, callback); + return; + } - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + if ( + 'returnCommitStats' in options && + (options as CommitOptions).returnCommitStats + ) { + reqOpts.returnCommitStats = ( + options as CommitOptions + ).returnCommitStats; + } + if ( + 'maxCommitDelay' in options && + (options as CommitOptions).maxCommitDelay + ) { + reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; + } + reqOpts.requestOptions = Object.assign( + requestOptions || {}, + this.requestOptions + ); - span.addEvent('Starting Commit'); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'commit', - reqOpts, - gaxOpts: gaxOpts, - headers: headers, - }, - (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { - this.end(); + span.addEvent('Starting Commit'); + + this.request( + { + client: 'SpannerClient', + method: 'commit', + reqOpts, + gaxOpts: gaxOpts, + headers: headers, + }, + (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { + this.end(); + + if (err) { + span.addEvent('Commit failed'); + setSpanError(span, err); + } else { + span.addEvent('Commit Done'); + } - if (err) { - span.addEvent('Commit failed'); - } else { - span.addEvent('Commit Done'); - } + if (resp && resp.commitTimestamp) { + this.commitTimestampProto = resp.commitTimestamp; + this.commitTimestamp = new PreciseDate( + resp.commitTimestamp as DateStruct + ); + } + err = Transaction.decorateCommitError(err as ServiceError, mutations); - if (resp && resp.commitTimestamp) { - this.commitTimestampProto = resp.commitTimestamp; - this.commitTimestamp = new PreciseDate( - resp.commitTimestamp as DateStruct - ); + span.end(); + callback!(err as ServiceError | null, resp); } - err = Transaction.decorateCommitError(err as ServiceError, mutations); - - callback!(err as ServiceError | null, resp); - } - ); + ); + }); } /** @@ -2323,45 +2412,53 @@ export class Transaction extends Dml { | spannerClient.spanner.v1.Spanner.RollbackCallback, cb?: spannerClient.spanner.v1.Spanner.RollbackCallback ): void | Promise { - const gaxOpts = - typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; - const callback = - typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; - - if (!this.id) { - callback!( - new Error( + const q = {opts: this.observabilityOptions}; + return startTrace('Transaction.rollback', q, span => { + const gaxOpts = + typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; + const callback = + typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; + + if (!this.id) { + const err = new Error( 'Transaction ID is unknown, nothing to rollback.' - ) as ServiceError - ); - return; - } - - const session = this.session.formattedName_!; - const transactionId = this.id; - const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { - session, - transactionId, - }; + ) as ServiceError; + setSpanError(span, err); + span.end(); + callback!(err); + return; + } - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const session = this.session.formattedName_!; + const transactionId = this.id; + const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { + session, + transactionId, + }; - this.request( - { - client: 'SpannerClient', - method: 'rollback', - reqOpts, - gaxOpts, - headers: headers, - }, - (err: null | ServiceError) => { - this.end(); - callback!(err); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); } - ); + + this.request( + { + client: 'SpannerClient', + method: 'rollback', + reqOpts, + gaxOpts, + headers: headers, + }, + (err: null | ServiceError) => { + this.end(); + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err); + } + ); + }); } /** @@ -2813,9 +2910,16 @@ export class PartitionedDml extends Dml { query: string | ExecuteSqlRequest, callback?: RunUpdateCallback ): void | Promise { - super.runUpdate(query, (err, count) => { - this.end(); - callback!(err, count); + const q = {sql: query, opts: this.observabilityOptions}; + return startTrace('PartitionedDml.runUpdate', q, span => { + super.runUpdate(query, (err, count) => { + if (err) { + setSpanError(span, err); + } + this.end(); + span.end(); + callback!(err, count); + }); }); } }