Skip to content

Commit

Permalink
[Service Bus] Allow timeout lesser than 60 seconds (#8805)
Browse files Browse the repository at this point in the history
* Move the duplicated getRetryAttemptTimeoutInMs from event-hubs and service-bus to the core-amqp

* changelog

* updates as suggested

* Not export getRetryAttemptTimeoutInMs from core-amqp

* undo changelog

* undo version updates

* fix event-hubs

* remove excess space

* remove the helper method in service-bus

* throw errors for invalid timeouts

* this._retryOptions.timeoutInMs! <= timeTakenByInit guard

* get rid of ! from timeoutInMs!

* Undo changes to event-hubs

* Changes suggested by Ramya

* Update sdk/servicebus/service-bus/src/serviceBusClient.ts

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>
  • Loading branch information
HarshaNalluru and ramya-rao-a authored May 20, 2020
1 parent c0c8bce commit 1028761
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 52 deletions.
27 changes: 21 additions & 6 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from "../serviceBusMessage";
import { ClientEntityContext } from "../clientEntityContext";
import { LinkEntity } from "./linkEntity";
import { getUniqueName, normalizeRetryOptions, RetryOptionsInternal } from "../util/utils";
import { getUniqueName } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";
Expand Down Expand Up @@ -77,7 +77,7 @@ export class MessageSender extends LinkEntity {
* @property {Sender} [_sender] The AMQP sender link.
*/
private _sender?: AwaitableSender;
private _retryOptions: RetryOptionsInternal;
private _retryOptions: RetryOptions;

/**
* Creates a new MessageSender instance.
Expand All @@ -89,7 +89,7 @@ export class MessageSender extends LinkEntity {
address: context.entityPath,
audience: `${context.namespace.config.endpoint}${context.entityPath}`
});
this._retryOptions = normalizeRetryOptions(retryOptions);
this._retryOptions = retryOptions;
this._onAmqpError = (context: EventContext) => {
const senderError = context.sender && context.sender.error;
if (senderError) {
Expand Down Expand Up @@ -254,6 +254,10 @@ export class MessageSender extends LinkEntity {
options: OperationOptions | undefined
): Promise<void> {
const abortSignal = options?.abortSignal;
let timeoutInMs =
this._retryOptions.timeoutInMs == undefined
? Constants.defaultOperationTimeoutInMs
: this._retryOptions.timeoutInMs;

const sendEventPromise = () =>
new Promise<void>(async (resolve, reject) => {
Expand All @@ -279,7 +283,7 @@ export class MessageSender extends LinkEntity {
description: desc
};
return rejectInitTimeoutPromise(translate(e));
}, this._retryOptions.timeoutInMs);
}, timeoutInMs);
});

try {
Expand Down Expand Up @@ -325,9 +329,20 @@ export class MessageSender extends LinkEntity {
);
}
if (this._sender!.sendable()) {
if (timeoutInMs <= timeTakenByInit) {
const desc: string =
`[${this._context.namespace.connectionId}] Sender "${this.name}" ` +
`with address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return reject(translate(e));
}
try {
this._sender!.sendTimeoutInSeconds =
(this._retryOptions.timeoutInMs - timeTakenByInit) / 1000;
this._sender!.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000;
const delivery = await this._sender!.send(
encodedMessage,
undefined,
Expand Down
12 changes: 8 additions & 4 deletions sdk/servicebus/service-bus/src/serviceBusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { CreateSessionReceiverOptions, CreateSenderOptions } from "./models";
import { Receiver, ReceiverImpl } from "./receivers/receiver";
import { SessionReceiver, SessionReceiverImpl } from "./receivers/sessionReceiver";
import { ReceivedMessageWithLock, ReceivedMessage } from "./serviceBusMessage";
import { getRetryAttemptTimeoutInMs } from "./util/utils";

/**
* A client that can create Sender instances for sending messages to queues and
Expand Down Expand Up @@ -83,9 +82,14 @@ export class ServiceBusClient {
}
this.fullyQualifiedNamespace = this._connectionContext.config.host;
this._clientOptions.retryOptions = this._clientOptions.retryOptions || {};
this._clientOptions.retryOptions.timeoutInMs = getRetryAttemptTimeoutInMs(
this._clientOptions.retryOptions
);

const timeoutInMs = this._clientOptions.retryOptions.timeoutInMs;
if (
timeoutInMs != undefined &&
(typeof timeoutInMs !== "number" || !isFinite(timeoutInMs) || timeoutInMs <= 0)
) {
throw new Error(`${timeoutInMs} is an invalid value for retryOptions.timeoutInMs`);
}
}

/**
Expand Down
42 changes: 0 additions & 42 deletions sdk/servicebus/service-bus/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { generate_uuid } from "rhea-promise";
import isBuffer from "is-buffer";
import { Buffer } from "buffer";
import * as Constants from "../util/constants";
import { Constants as CoreAMQPConstants, RetryOptions } from "@azure/core-amqp";

// This is the only dependency we have on DOM types, so rather than require
// the DOM lib we can just shim this in.
Expand Down Expand Up @@ -42,47 +41,6 @@ export function getUniqueName(name: string): string {
return `${name}-${generate_uuid()}`;
}

/**
* @internal
* @ignore
*
* TODO: I think this is duplicated from core-amqp and should be du-duped, but _before_
* that happens we should question whether it's even a legitimate way of setting the timeout
* because it just squashes all timeouts beneath 60 seconds to be 60 seconds instead.
*/
export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number {
const timeoutInMs =
retryOptions == undefined ||
typeof retryOptions.timeoutInMs !== "number" ||
!isFinite(retryOptions.timeoutInMs) ||
// TODO: not sure what the justification is for always forcing at least 60 seconds.
retryOptions.timeoutInMs < CoreAMQPConstants.defaultOperationTimeoutInMs
? CoreAMQPConstants.defaultOperationTimeoutInMs
: retryOptions.timeoutInMs;

return timeoutInMs;
}

/**
* @internal
* @ignore
*/
export type RetryOptionsInternal = Required<Pick<RetryOptions, "timeoutInMs">> &
Exclude<RetryOptions, "timeoutInMs">;

/**
* @internal
* @ignore
*/
export function normalizeRetryOptions(
retryOptions: RetryOptions | undefined
): RetryOptionsInternal {
return {
...retryOptions,
timeoutInMs: getRetryAttemptTimeoutInMs(retryOptions)
};
}

/**
* @internal
* @ignore
Expand Down

0 comments on commit 1028761

Please sign in to comment.