diff --git a/sdk/servicebus/service-bus/src/core/messageSender.ts b/sdk/servicebus/service-bus/src/core/messageSender.ts index d55221e8e1e9..1214c5172d93 100644 --- a/sdk/servicebus/service-bus/src/core/messageSender.ts +++ b/sdk/servicebus/service-bus/src/core/messageSender.ts @@ -32,7 +32,7 @@ import { } from "../serviceBusMessage"; import { ClientEntityContext } from "../clientEntityContext"; import { LinkEntity } from "./linkEntity"; -import { getUniqueName, normalizeRetryOptions, RetryOptionsInternal } from "../util/utils"; +import { getUniqueName } from "../util/utils"; import { throwErrorIfConnectionClosed } from "../util/errors"; import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch"; import { CreateBatchOptions } from "../models"; @@ -77,7 +77,7 @@ export class MessageSender extends LinkEntity { * @property {Sender} [_sender] The AMQP sender link. */ private _sender?: AwaitableSender; - private _retryOptions: RetryOptionsInternal; + private _retryOptions: RetryOptions; /** * Creates a new MessageSender instance. @@ -89,7 +89,7 @@ export class MessageSender extends LinkEntity { address: context.entityPath, audience: `${context.namespace.config.endpoint}${context.entityPath}` }); - this._retryOptions = normalizeRetryOptions(retryOptions); + this._retryOptions = retryOptions; this._onAmqpError = (context: EventContext) => { const senderError = context.sender && context.sender.error; if (senderError) { @@ -254,6 +254,10 @@ export class MessageSender extends LinkEntity { options: OperationOptions | undefined ): Promise { const abortSignal = options?.abortSignal; + let timeoutInMs = + this._retryOptions.timeoutInMs == undefined + ? Constants.defaultOperationTimeoutInMs + : this._retryOptions.timeoutInMs; const sendEventPromise = () => new Promise(async (resolve, reject) => { @@ -279,7 +283,7 @@ export class MessageSender extends LinkEntity { description: desc }; return rejectInitTimeoutPromise(translate(e)); - }, this._retryOptions.timeoutInMs); + }, timeoutInMs); }); try { @@ -325,9 +329,20 @@ export class MessageSender extends LinkEntity { ); } if (this._sender!.sendable()) { + if (timeoutInMs <= timeTakenByInit) { + const desc: string = + `[${this._context.namespace.connectionId}] Sender "${this.name}" ` + + `with address "${this.address}", was not able to send the message right now, due ` + + `to operation timeout.`; + log.error(desc); + const e: AmqpError = { + condition: ErrorNameConditionMapper.ServiceUnavailableError, + description: desc + }; + return reject(translate(e)); + } try { - this._sender!.sendTimeoutInSeconds = - (this._retryOptions.timeoutInMs - timeTakenByInit) / 1000; + this._sender!.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000; const delivery = await this._sender!.send( encodedMessage, undefined, diff --git a/sdk/servicebus/service-bus/src/serviceBusClient.ts b/sdk/servicebus/service-bus/src/serviceBusClient.ts index aef82cb2aa41..9e03a021c901 100644 --- a/sdk/servicebus/service-bus/src/serviceBusClient.ts +++ b/sdk/servicebus/service-bus/src/serviceBusClient.ts @@ -15,7 +15,6 @@ import { CreateSessionReceiverOptions, CreateSenderOptions } from "./models"; import { Receiver, ReceiverImpl } from "./receivers/receiver"; import { SessionReceiver, SessionReceiverImpl } from "./receivers/sessionReceiver"; import { ReceivedMessageWithLock, ReceivedMessage } from "./serviceBusMessage"; -import { getRetryAttemptTimeoutInMs } from "./util/utils"; /** * A client that can create Sender instances for sending messages to queues and @@ -83,9 +82,14 @@ export class ServiceBusClient { } this.fullyQualifiedNamespace = this._connectionContext.config.host; this._clientOptions.retryOptions = this._clientOptions.retryOptions || {}; - this._clientOptions.retryOptions.timeoutInMs = getRetryAttemptTimeoutInMs( - this._clientOptions.retryOptions - ); + + const timeoutInMs = this._clientOptions.retryOptions.timeoutInMs; + if ( + timeoutInMs != undefined && + (typeof timeoutInMs !== "number" || !isFinite(timeoutInMs) || timeoutInMs <= 0) + ) { + throw new Error(`${timeoutInMs} is an invalid value for retryOptions.timeoutInMs`); + } } /** diff --git a/sdk/servicebus/service-bus/src/util/utils.ts b/sdk/servicebus/service-bus/src/util/utils.ts index d42a2d643c3b..26be40bac87f 100644 --- a/sdk/servicebus/service-bus/src/util/utils.ts +++ b/sdk/servicebus/service-bus/src/util/utils.ts @@ -7,7 +7,6 @@ import { generate_uuid } from "rhea-promise"; import isBuffer from "is-buffer"; import { Buffer } from "buffer"; import * as Constants from "../util/constants"; -import { Constants as CoreAMQPConstants, RetryOptions } from "@azure/core-amqp"; // This is the only dependency we have on DOM types, so rather than require // the DOM lib we can just shim this in. @@ -42,47 +41,6 @@ export function getUniqueName(name: string): string { return `${name}-${generate_uuid()}`; } -/** - * @internal - * @ignore - * - * TODO: I think this is duplicated from core-amqp and should be du-duped, but _before_ - * that happens we should question whether it's even a legitimate way of setting the timeout - * because it just squashes all timeouts beneath 60 seconds to be 60 seconds instead. - */ -export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number { - const timeoutInMs = - retryOptions == undefined || - typeof retryOptions.timeoutInMs !== "number" || - !isFinite(retryOptions.timeoutInMs) || - // TODO: not sure what the justification is for always forcing at least 60 seconds. - retryOptions.timeoutInMs < CoreAMQPConstants.defaultOperationTimeoutInMs - ? CoreAMQPConstants.defaultOperationTimeoutInMs - : retryOptions.timeoutInMs; - - return timeoutInMs; -} - -/** - * @internal - * @ignore - */ -export type RetryOptionsInternal = Required> & - Exclude; - -/** - * @internal - * @ignore - */ -export function normalizeRetryOptions( - retryOptions: RetryOptions | undefined -): RetryOptionsInternal { - return { - ...retryOptions, - timeoutInMs: getRetryAttemptTimeoutInMs(retryOptions) - }; -} - /** * @internal * @ignore