Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] migrate to @azure/core-tracing v1.0.0 #22517

Merged
merged 14 commits into from
Jul 12, 2022
Merged
4 changes: 3 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 7.6.1 (Unreleased)
## 7.7.0 (Unreleased)

### Features Added

Expand All @@ -10,6 +10,8 @@

### Other Changes

- Updated our `@azure/core-tracing` dependency to the latest version (v1.0.0)

## 7.6.0 (2022-07-07)

### Other Changes
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@azure/service-bus",
"sdk-type": "client",
"author": "Microsoft Corporation",
"version": "7.6.1",
"version": "7.7.0",
"license": "MIT",
"description": "Azure Service Bus SDK for JavaScript",
"homepage": "https://github.com/Azure/azure-sdk-for-js/tree/main/sdk/servicebus/service-bus/",
Expand Down Expand Up @@ -110,7 +110,7 @@
"@azure/core-auth": "^1.3.0",
"@azure/core-client": "^1.0.0",
"@azure/core-rest-pipeline": "^1.1.0",
"@azure/core-tracing": "1.0.0-preview.13",
"@azure/core-tracing": "^1.0.0",
"@azure/core-paging": "^1.1.1",
"@azure/core-util": "^1.0.0",
"@azure/core-xml": "^1.0.0",
Expand Down
7 changes: 2 additions & 5 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import { RetryMode } from '@azure/core-amqp';
import { RetryOptions } from '@azure/core-amqp';
import { SASCredential } from '@azure/core-auth';
import { ServiceClient } from '@azure/core-client';
import { Span } from '@azure/core-tracing';
import { SpanContext } from '@azure/core-tracing';
import { TokenCredential } from '@azure/core-auth';
import { TokenType } from '@azure/core-amqp';
import { TracingContext } from '@azure/core-tracing';
import { UserAgentPolicyOptions } from '@azure/core-rest-pipeline';
import { WebSocketImpl } from 'rhea-promise';
import { WebSocketOptions } from '@azure/core-amqp';
Expand Down Expand Up @@ -448,7 +447,7 @@ export interface ServiceBusMessageBatch {
_generateMessage(): Buffer;
readonly maxSizeInBytes: number;
// @internal
readonly _messageSpanContexts: SpanContext[];
readonly _messageSpanContexts: TracingContext[];
readonly sizeInBytes: number;
tryAddMessage(message: ServiceBusMessage | AmqpAnnotatedMessage, options?: TryAddOptions): boolean;
}
Expand Down Expand Up @@ -637,8 +636,6 @@ export type TransferProgressEvent = {

// @public
export interface TryAddOptions {
// @deprecated (undocumented)
parentSpan?: Span | SpanContext | null;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor version bump probably enough for this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

tracingOptions?: OperationTracingOptions;
}

Expand Down
18 changes: 8 additions & 10 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import { throwErrorIfConnectionClosed } from "../util/errors";
import { AbortSignalLike } from "@azure/abort-controller";
import { checkAndRegisterWithAbortSignal } from "../util/utils";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { createAndEndProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { toProcessingSpanOptions } from "../diagnostics/instrumentServiceBusMessage";
import { ReceiveMode } from "../models";
import { ServiceBusError, translateServiceBusError } from "../serviceBusError";
import { tracingClient } from "../diagnostics/tracing";

/**
* Describes the batching receiver where the user can receive a specified number of messages for
Expand Down Expand Up @@ -238,11 +239,6 @@ interface ReceiveMessageArgs extends OperationOptionsBase {
* @internal
*/
export class BatchingReceiverLite {
/**
* NOTE: exists only to make unit testing possible.
*/
private _createAndEndProcessingSpan: typeof createAndEndProcessingSpan;

constructor(
private _connectionContext: ConnectionContext,
public entityPath: string,
Expand All @@ -252,8 +248,6 @@ export class BatchingReceiverLite {
private _receiveMode: ReceiveMode,
_skipParsingBodyAsJson: boolean
) {
this._createAndEndProcessingSpan = createAndEndProcessingSpan;

this._createServiceBusMessage = (context: MessageAndDelivery) => {
return new ServiceBusMessageImpl(
context.message!,
Expand Down Expand Up @@ -301,8 +295,12 @@ export class BatchingReceiverLite {
const messages = await new Promise<ServiceBusMessageImpl[]>((resolve, reject) =>
this._receiveMessagesImpl(receiver, args, resolve, reject)
);
this._createAndEndProcessingSpan(messages, this, this._connectionContext.config, args);
return messages;
return tracingClient.withSpan(
"BatchingReceiverLite.process",
args,
() => messages,
toProcessingSpanOptions(messages, this, this._connectionContext.config)
);
} finally {
this._closeHandler = undefined;
this.isReceivingMessages = false;
Expand Down
13 changes: 9 additions & 4 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
RetryOptions,
ConditionErrorNameMapper,
} from "@azure/core-amqp";
import { OperationOptionsBase, trace } from "../modelsToBeSharedWithEventHubs";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { receiverLogger as logger } from "../log";
import { AmqpError, EventContext, OnAmqpEvent } from "rhea-promise";
import { ServiceBusMessageImpl } from "../serviceBusMessage";
Expand All @@ -27,8 +27,9 @@ import {
ProcessErrorArgs,
SubscribeOptions,
} from "../models";
import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { toProcessingSpanOptions } from "../diagnostics/instrumentServiceBusMessage";
import { AbortError } from "@azure/abort-controller";
import { tracingClient } from "../diagnostics/tracing";

/**
* @internal
Expand Down Expand Up @@ -484,8 +485,12 @@ export class StreamingReceiver extends MessageReceiver {
},
processMessage: async (message: ServiceBusMessageImpl) => {
try {
const span = createProcessingSpan(message, this, this._context.config, operationOptions);
return await trace(() => userHandlers.processMessage(message), span);
await tracingClient.withSpan(
"StreamReceiver.process",
operationOptions ?? {},
() => userHandlers.processMessage(message),
toProcessingSpanOptions(message, this, this._context.config)
);
} catch (err: any) {
this._messageHandlers().processError({
error: err,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,114 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import {
extractSpanContextFromTraceParentHeader,
SpanStatusCode,
Link,
Span,
SpanContext,
SpanKind,
} from "@azure/core-tracing";
import { TracingContext, TracingSpanLink, TracingSpanOptions } from "@azure/core-tracing";
import { ConnectionContext } from "../connectionContext";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusReceiver } from "../receivers/receiver";
import { ServiceBusMessage, ServiceBusReceivedMessage } from "../serviceBusMessage";
import { createServiceBusSpan } from "./tracing";
import { toSpanOptions, tracingClient } from "./tracing";

/**
* @hidden
* @internal
*/
export const TRACEPARENT_PROPERTY = "Diagnostic-Id";

/**
* @hidden
*/
export interface InstrumentableMessage {
/**
* The application specific properties which can be
* used for custom message metadata.
*/
applicationProperties?: { [key: string]: number | boolean | string | Date | null };
}

/**
* Instruments an AMQP message with a proper `Diagnostic-Id` for tracing.
*
* @hidden
*/
export function instrumentMessage<T extends InstrumentableMessage>(
message: T,
options: OperationOptionsBase,
entityPath: string,
host: string
): {
/**
* If instrumentation was done, a copy of the message with
* message.applicationProperties['Diagnostic-Id'] filled
* out appropriately.
*/
message: T;

/**
* A valid SpanContext if this message should be linked to a parent span, or undefined otherwise.
*/
spanContext: TracingContext | undefined;
} {
// check if the event has already been instrumented
const previouslyInstrumented = Boolean(message.applicationProperties?.[TRACEPARENT_PROPERTY]);

if (previouslyInstrumented) {
return {
message,
spanContext: undefined,
};
}

const { span: messageSpan, updatedOptions } = tracingClient.startSpan(
"message",
options,
toSpanOptions({ entityPath, host }, "producer")
);

try {
if (!messageSpan.isRecording()) {
return {
message,
spanContext: undefined,
};
}

const traceParent = tracingClient.createRequestHeaders(
updatedOptions.tracingOptions?.tracingContext
)["traceparent"];

if (traceParent) {
// create a copy so the original isn't modified
message = {
...message,
applicationProperties: {
...message.applicationProperties,
[TRACEPARENT_PROPERTY]: traceParent,
},
};
}

return {
message,
spanContext: updatedOptions.tracingOptions?.tracingContext,
};
} finally {
messageSpan.end();
}
}

/**
* Extracts the `SpanContext` from an `ServiceBusMessage` if the context exists.
* @param message - An individual `ServiceBusMessage` object.
* @internal
*/
export function extractSpanContextFromServiceBusMessage(
message: ServiceBusMessage
): SpanContext | undefined {
): TracingContext | undefined {
if (!message.applicationProperties || !message.applicationProperties[TRACEPARENT_PROPERTY]) {
return;
}

const diagnosticId = message.applicationProperties[TRACEPARENT_PROPERTY] as string;
return extractSpanContextFromTraceParentHeader(diagnosticId);
return tracingClient.parseTraceparentHeader(diagnosticId);
}

/**
Expand All @@ -56,70 +131,28 @@ function* getReceivedMessages(
}

/**
* A span that encompasses the period when the message has been received and
* is being processed.
*
* NOTE: The amount of time the user would be considered processing the message is
* not always clear - in that case the span will have a very short lifetime
* since we'll start the span when we receive the message and end it when we
* give the message to the user.
*
* @internal
*/
export function createProcessingSpan(
export function toProcessingSpanOptions(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
// NOTE: the connectionConfig also has an entityPath property but that only
// represents the optional entityPath in their connection string which is NOT
// what we want for tracing.
receiver: Pick<ServiceBusReceiver, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">,
options?: OperationOptionsBase
): Span {
const links: Link[] = [];

connectionConfig: Pick<ConnectionContext["config"], "host">
): TracingSpanOptions {
const spanLinks: TracingSpanLink[] = [];
for (const receivedMessage of getReceivedMessages(receivedMessages)) {
const spanContext = extractSpanContextFromServiceBusMessage(receivedMessage);

if (spanContext == null) {
continue;
const tracingContext = extractSpanContextFromServiceBusMessage(receivedMessage);
if (tracingContext) {
spanLinks.push({
tracingContext,
attributes: {
enqueuedTime: receivedMessage.enqueuedTimeUtc?.getTime(),
},
});
}

links.push({
context: spanContext,
attributes: {
enqueuedTime: receivedMessage.enqueuedTimeUtc?.getTime(),
},
});
}

const { span } = createServiceBusSpan(
"process",
options,
receiver.entityPath,
connectionConfig.host,
{
kind: SpanKind.CONSUMER,
links,
}
);

return span;
}

/**
* Creates and immediately ends a processing span. Used when
* the 'processing' occurs outside of our control so we don't
* know the scope.
*
* @internal
*/
export function createAndEndProcessingSpan(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
receiver: Pick<ServiceBusReceiver, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">,
options?: OperationOptionsBase
): void {
const span = createProcessingSpan(receivedMessages, receiver, connectionConfig, options);
span.setStatus({ code: SpanStatusCode.OK });
span.end();
return {
spanLinks,
spanKind: "consumer",
...toSpanOptions({ host: connectionConfig.host, entityPath: receiver.entityPath }),
};
}
Loading