Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Allow timeout lesser than 60 seconds #8805

Merged
merged 16 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions sdk/eventhub/event-hubs/src/util/retries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
import { Constants, RetryOptions } from "@azure/core-amqp";

/**
* @internal
* @ignore
* Invalid timeouts, non-positive timeouts are defaulted to the `Constants.defaultOperationTimeoutInMs`
*
* @export
* @param {(RetryOptions | undefined)} retryOptions
* @returns {number}
*/
export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number {
const timeoutInMs =
retryOptions == undefined ||
typeof retryOptions.timeoutInMs !== "number" ||
!isFinite(retryOptions.timeoutInMs) ||
retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInMs
retryOptions.timeoutInMs <= 0
? Constants.defaultOperationTimeoutInMs
: retryOptions.timeoutInMs;
return timeoutInMs;
Expand Down
8 changes: 4 additions & 4 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from "../serviceBusMessage";
import { ClientEntityContext } from "../clientEntityContext";
import { LinkEntity } from "./linkEntity";
import { getUniqueName, normalizeRetryOptions, RetryOptionsInternal } from "../util/utils";
import { getUniqueName } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";
Expand Down Expand Up @@ -77,7 +77,7 @@ export class MessageSender extends LinkEntity {
* @property {Sender} [_sender] The AMQP sender link.
*/
private _sender?: AwaitableSender;
private _retryOptions: RetryOptionsInternal;
private _retryOptions: RetryOptions;

/**
* Creates a new MessageSender instance.
Expand All @@ -89,7 +89,7 @@ export class MessageSender extends LinkEntity {
address: context.entityPath,
audience: `${context.namespace.config.endpoint}${context.entityPath}`
});
this._retryOptions = normalizeRetryOptions(retryOptions);
this._retryOptions = retryOptions;
this._onAmqpError = (context: EventContext) => {
const senderError = context.sender && context.sender.error;
if (senderError) {
Expand Down Expand Up @@ -327,7 +327,7 @@ export class MessageSender extends LinkEntity {
if (this._sender!.sendable()) {
try {
this._sender!.sendTimeoutInSeconds =
(this._retryOptions.timeoutInMs - timeTakenByInit) / 1000;
(this._retryOptions.timeoutInMs! - timeTakenByInit) / 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #8685, we added a guard to ensure this does not result in -ve value in Event Hubs. Can we include that in this PR for Service Bus

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

const delivery = await this._sender!.send(
encodedMessage,
undefined,
Expand Down
25 changes: 14 additions & 11 deletions sdk/servicebus/service-bus/src/serviceBusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license.

import { generate_uuid } from "rhea-promise";
import { isTokenCredential, TokenCredential } from "@azure/core-amqp";
import { isTokenCredential, TokenCredential, Constants } from "@azure/core-amqp";
import {
ServiceBusClientOptions,
createConnectionContextForTokenCredential,
Expand All @@ -15,7 +15,6 @@ import { CreateSessionReceiverOptions, CreateSenderOptions } from "./models";
import { Receiver, ReceiverImpl } from "./receivers/receiver";
import { SessionReceiver, SessionReceiverImpl } from "./receivers/sessionReceiver";
import { ReceivedMessageWithLock, ReceivedMessage } from "./serviceBusMessage";
import { getRetryAttemptTimeoutInMs } from "./util/utils";

/**
* A client that can create Sender instances for sending messages to queues and
Expand Down Expand Up @@ -81,9 +80,13 @@ export class ServiceBusClient {
}
this.fullyQualifiedNamespace = this._connectionContext.config.host;
this._clientOptions.retryOptions = this._clientOptions.retryOptions || {};
this._clientOptions.retryOptions.timeoutInMs = getRetryAttemptTimeoutInMs(
this._clientOptions.retryOptions
);

let timeoutInMs = this._clientOptions.retryOptions.timeoutInMs;
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
timeoutInMs = timeoutInMs == undefined ? Constants.defaultOperationTimeoutInMs : timeoutInMs;
if (typeof timeoutInMs !== "number" || !isFinite(timeoutInMs) || timeoutInMs <= 0) {
throw new Error(`${timeoutInMs} is an invalid value for retryOptions.timeoutInMs`);
}
this._clientOptions.retryOptions.timeoutInMs = timeoutInMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to set the timeout to the default value here? This forces us to use ! every time we want to use retryOptions.timeoutInMs in all other places.

Since the interface allows the timeout to not be set at all, I would suggest that we only do the error check here and copy over the timeoutInMs == undefined ? Constants.defaultOperationTimeoutInMs : timeoutInMs; to each place where the timeout is used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated messageSender with this, the other methods already have this. And thus avoided ! at all the places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, now that all consumers of retry options will check for undefined and fallback to the default value, do you need the above where you set this._clientOptions.retryOptions.timeoutInMs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, if the timeout is undefined, typeof timeoutInMs !== "number" will be satisfied and an error is thrown. I thought it to be an overkill to allow undefined by placing more guards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then add the timeout != undefined to your if check

if (timeout != undefined && (typeof timeoutInMs !== "number" || !isFinite(timeoutInMs) || timeoutInMs <= 0))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!!

}

/**
Expand All @@ -98,7 +101,7 @@ export class ServiceBusClient {
*
* You can settle a message by calling complete(), abandon(), defer() or deadletter() methods on
* the message.
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -127,7 +130,7 @@ export class ServiceBusClient {
*
* You can settle a message by calling complete(), abandon(), defer() or deadletter() methods on
* the message.
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -201,7 +204,7 @@ export class ServiceBusClient {
*
* You can settle a message by calling complete(), abandon(), defer() or deadletter() methods on
* the message.
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -240,7 +243,7 @@ export class ServiceBusClient {
*
* You can settle a message by calling complete(), abandon(), defer() or deadletter() methods on
* the message.
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -352,7 +355,7 @@ export class ServiceBusClient {
*
* See here for more information about dead letter queues:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -389,7 +392,7 @@ export class ServiceBusClient {
*
* See here for more information about dead letter queues:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down
42 changes: 0 additions & 42 deletions sdk/servicebus/service-bus/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { generate_uuid } from "rhea-promise";
import isBuffer from "is-buffer";
import { Buffer } from "buffer";
import * as Constants from "../util/constants";
import { Constants as CoreAMQPConstants, RetryOptions } from "@azure/core-amqp";

// This is the only dependency we have on DOM types, so rather than require
// the DOM lib we can just shim this in.
Expand Down Expand Up @@ -42,47 +41,6 @@ export function getUniqueName(name: string): string {
return `${name}-${generate_uuid()}`;
}

/**
* @internal
* @ignore
*
* TODO: I think this is duplicated from core-amqp and should be du-duped, but _before_
* that happens we should question whether it's even a legitimate way of setting the timeout
* because it just squashes all timeouts beneath 60 seconds to be 60 seconds instead.
*/
export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number {
const timeoutInMs =
retryOptions == undefined ||
typeof retryOptions.timeoutInMs !== "number" ||
!isFinite(retryOptions.timeoutInMs) ||
// TODO: not sure what the justification is for always forcing at least 60 seconds.
retryOptions.timeoutInMs < CoreAMQPConstants.defaultOperationTimeoutInMs
? CoreAMQPConstants.defaultOperationTimeoutInMs
: retryOptions.timeoutInMs;

return timeoutInMs;
}

/**
* @internal
* @ignore
*/
export type RetryOptionsInternal = Required<Pick<RetryOptions, "timeoutInMs">> &
Exclude<RetryOptions, "timeoutInMs">;

/**
* @internal
* @ignore
*/
export function normalizeRetryOptions(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this was already done at SBC level, I see no point in keeping it now.

retryOptions: RetryOptions | undefined
): RetryOptionsInternal {
return {
...retryOptions,
timeoutInMs: getRetryAttemptTimeoutInMs(retryOptions)
};
}

/**
* @internal
* @ignore
Expand Down