Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Allow timeout lesser than 60 seconds #8805

Merged
merged 16 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from "../serviceBusMessage";
import { ClientEntityContext } from "../clientEntityContext";
import { LinkEntity } from "./linkEntity";
import { getUniqueName, normalizeRetryOptions, RetryOptionsInternal } from "../util/utils";
import { getUniqueName } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";
Expand Down Expand Up @@ -77,7 +77,7 @@ export class MessageSender extends LinkEntity {
* @property {Sender} [_sender] The AMQP sender link.
*/
private _sender?: AwaitableSender;
private _retryOptions: RetryOptionsInternal;
private _retryOptions: RetryOptions;

/**
* Creates a new MessageSender instance.
Expand All @@ -89,7 +89,7 @@ export class MessageSender extends LinkEntity {
address: context.entityPath,
audience: `${context.namespace.config.endpoint}${context.entityPath}`
});
this._retryOptions = normalizeRetryOptions(retryOptions);
this._retryOptions = retryOptions;
this._onAmqpError = (context: EventContext) => {
const senderError = context.sender && context.sender.error;
if (senderError) {
Expand Down Expand Up @@ -254,6 +254,10 @@ export class MessageSender extends LinkEntity {
options: OperationOptions | undefined
): Promise<void> {
const abortSignal = options?.abortSignal;
let timeoutInMs =
this._retryOptions.timeoutInMs == undefined
? Constants.defaultOperationTimeoutInMs
: this._retryOptions.timeoutInMs;

const sendEventPromise = () =>
new Promise<void>(async (resolve, reject) => {
Expand All @@ -279,7 +283,7 @@ export class MessageSender extends LinkEntity {
description: desc
};
return rejectInitTimeoutPromise(translate(e));
}, this._retryOptions.timeoutInMs);
}, timeoutInMs);
});

try {
Expand Down Expand Up @@ -325,9 +329,20 @@ export class MessageSender extends LinkEntity {
);
}
if (this._sender!.sendable()) {
if (timeoutInMs <= timeTakenByInit) {
const desc: string =
`[${this._context.namespace.connectionId}] Sender "${this.name}" ` +
`with address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return reject(translate(e));
}
try {
this._sender!.sendTimeoutInSeconds =
(this._retryOptions.timeoutInMs - timeTakenByInit) / 1000;
this._sender!.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000;
const delivery = await this._sender!.send(
encodedMessage,
undefined,
Expand Down
24 changes: 14 additions & 10 deletions sdk/servicebus/service-bus/src/serviceBusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { CreateSessionReceiverOptions, CreateSenderOptions } from "./models";
import { Receiver, ReceiverImpl } from "./receivers/receiver";
import { SessionReceiver, SessionReceiverImpl } from "./receivers/sessionReceiver";
import { ReceivedMessageWithLock, ReceivedMessage } from "./serviceBusMessage";
import { getRetryAttemptTimeoutInMs } from "./util/utils";

/**
* A client that can create Sender instances for sending messages to queues and
Expand Down Expand Up @@ -81,9 +80,14 @@ export class ServiceBusClient {
}
this.fullyQualifiedNamespace = this._connectionContext.config.host;
this._clientOptions.retryOptions = this._clientOptions.retryOptions || {};
this._clientOptions.retryOptions.timeoutInMs = getRetryAttemptTimeoutInMs(
this._clientOptions.retryOptions
);

let timeoutInMs = this._clientOptions.retryOptions.timeoutInMs;
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
if (
timeoutInMs != undefined &&
(typeof timeoutInMs !== "number" || !isFinite(timeoutInMs) || timeoutInMs <= 0)
) {
throw new Error(`${timeoutInMs} is an invalid value for retryOptions.timeoutInMs`);
}
}

/**
Expand All @@ -98,7 +102,7 @@ export class ServiceBusClient {
*
* You can settle a message by calling complete(), abandon(), defer() or deadletter() methods on
* the message.
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -127,7 +131,7 @@ export class ServiceBusClient {
*
* You can settle a message by calling complete(), abandon(), defer() or deadletter() methods on
* the message.
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -201,7 +205,7 @@ export class ServiceBusClient {
*
* You can settle a message by calling complete(), abandon(), defer() or deadletter() methods on
* the message.
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -240,7 +244,7 @@ export class ServiceBusClient {
*
* You can settle a message by calling complete(), abandon(), defer() or deadletter() methods on
* the message.
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -352,7 +356,7 @@ export class ServiceBusClient {
*
* See here for more information about dead letter queues:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down Expand Up @@ -389,7 +393,7 @@ export class ServiceBusClient {
*
* See here for more information about dead letter queues:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues
*
*
* More information about how peekLock and message settlement works here:
* https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock
*
Expand Down
42 changes: 0 additions & 42 deletions sdk/servicebus/service-bus/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { generate_uuid } from "rhea-promise";
import isBuffer from "is-buffer";
import { Buffer } from "buffer";
import * as Constants from "../util/constants";
import { Constants as CoreAMQPConstants, RetryOptions } from "@azure/core-amqp";

// This is the only dependency we have on DOM types, so rather than require
// the DOM lib we can just shim this in.
Expand Down Expand Up @@ -42,47 +41,6 @@ export function getUniqueName(name: string): string {
return `${name}-${generate_uuid()}`;
}

/**
* @internal
* @ignore
*
* TODO: I think this is duplicated from core-amqp and should be du-duped, but _before_
* that happens we should question whether it's even a legitimate way of setting the timeout
* because it just squashes all timeouts beneath 60 seconds to be 60 seconds instead.
*/
export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number {
const timeoutInMs =
retryOptions == undefined ||
typeof retryOptions.timeoutInMs !== "number" ||
!isFinite(retryOptions.timeoutInMs) ||
// TODO: not sure what the justification is for always forcing at least 60 seconds.
retryOptions.timeoutInMs < CoreAMQPConstants.defaultOperationTimeoutInMs
? CoreAMQPConstants.defaultOperationTimeoutInMs
: retryOptions.timeoutInMs;

return timeoutInMs;
}

/**
* @internal
* @ignore
*/
export type RetryOptionsInternal = Required<Pick<RetryOptions, "timeoutInMs">> &
Exclude<RetryOptions, "timeoutInMs">;

/**
* @internal
* @ignore
*/
export function normalizeRetryOptions(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this was already done at SBC level, I see no point in keeping it now.

retryOptions: RetryOptions | undefined
): RetryOptionsInternal {
return {
...retryOptions,
timeoutInMs: getRetryAttemptTimeoutInMs(retryOptions)
};
}

/**
* @internal
* @ignore
Expand Down