Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Make close() work when a connection is in process #8746

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6d17e13
Handle the non-session side of making sure that close() will properly…
richardpark-msft May 6, 2020
203a90d
Apply the same open/close logic to MessageSession - if an open is in …
richardpark-msft May 6, 2020
741cb63
Adding in tests for streaming and batching receivers.
richardpark-msft May 7, 2020
0d3198a
Rollback change here - it won't work properly with the way that Sessi…
richardpark-msft May 7, 2020
0d1c8fe
Batch receiver is taken care of.
richardpark-msft May 7, 2020
fb7b198
Fixing several issues that was making the locking work inconsistently…
richardpark-msft May 7, 2020
923fcef
Some more updates:
richardpark-msft May 8, 2020
3330f4a
Whoops, should have committed with the previous commit. This is the "…
richardpark-msft May 8, 2020
5933c14
Consistency
richardpark-msft May 8, 2020
93dc121
Consistency!
richardpark-msft May 8, 2020
65a569e
We don't need a +!
richardpark-msft May 8, 2020
8c93c1f
Merge branch 'richardpark-7986-proper-close' of https://github.com/ri…
richardpark-msft May 8, 2020
ebb33e7
Fix inconsistent name
richardpark-msft May 8, 2020
8fc9bae
Adding in a comment for what the call stack structure (including the …
richardpark-msft May 8, 2020
2f0ca7f
Remove more +'age.
richardpark-msft May 8, 2020
bffee98
- Harsha's question about my test revealed it's lack of disrespect fo…
richardpark-msft May 9, 2020
052f576
Use a normal error with a .retryable field attached to it.
richardpark-msft May 9, 2020
5b63368
Remove throw and just go ahead and treat init() after close() in Mess…
richardpark-msft May 11, 2020
75b2325
StreamingReceiver is no longer a promise (we don't open it immediatel…
richardpark-msft May 11, 2020
f37aa0c
Merge remote-tracking branch 'upstream/hotfix/service-bus-1.1.7' into…
richardpark-msft May 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 73 additions & 51 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import {
RetryOperationType,
RetryConfig,
ConditionErrorNameMapper,
ErrorNameConditionMapper
ErrorNameConditionMapper,
defaultLock
} from "@azure/amqp-common";
import {
Receiver,
Expand Down Expand Up @@ -242,6 +243,14 @@ export class MessageReceiver extends LinkEntity {
*/
private _isDetaching: boolean = false;

/**
* Used to prevent _init() and close() from accidentally overriding each other.
*
* This also allows close() to block in case an _init() is in progress so it can
* properly shut down.
*/
private _openLock: string = getUniqueName("messageReceiverOpen");

constructor(context: ClientEntityContext, receiverType: ReceiverType, options?: ReceiveOptions) {
super(context.entityPath, context, {
address: context.entityPath,
Expand Down Expand Up @@ -771,44 +780,54 @@ export class MessageReceiver extends LinkEntity {
this.name = options.name;
}

this.isConnecting = true;
await this._negotiateClaim();
if (!options) {
options = this._createReceiverOptions();
}
log.error(
"[%s] Trying to create receiver '%s' with options %O",
connectionId,
this.name,
options
);
await defaultLock.acquire(this._openLock, async () => {
if (this.wasCloseInitiated) {
return;
}
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved

this._receiver = await this._context.namespace.connection.createReceiver(options);
this.isConnecting = false;
log.error(
"[%s] Receiver '%s' with address '%s' has established itself.",
connectionId,
this.name,
this.address
);
log[this.receiverType](
"Promise to create the receiver resolved. " + "Created receiver with name: ",
this.name
);
log[this.receiverType](
"[%s] Receiver '%s' created with receiver options: %O",
connectionId,
this.name,
options
);
// It is possible for someone to close the receiver and then start it again.
// Thus make sure that the receiver is present in the client cache.
if (this.receiverType === ReceiverType.streaming && !this._context.streamingReceiver) {
this._context.streamingReceiver = this as any;
} else if (this.receiverType === ReceiverType.batching && !this._context.batchingReceiver) {
this._context.batchingReceiver = this as any;
}
await this._ensureTokenRenewal();
this.isConnecting = true;
Copy link
Contributor

@ramya-rao-a ramya-rao-a May 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isConnecting property gets checked in multiple places to make the decision of whether to call onDetached() or not so that there are not multiple attempts being made at recovering the link.

With the change in this PR, isConnecting is now being set to true only after the lock is acquired, resulting in potential multiple calls to onDetached now being a possibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though recently in #8401, @chradek did add a new _isDetaching flag to ensure that onDetached() is a no-op if it gets called multiple times..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But multiple calls to onDetached() will add to the noise in the logs

So, I would recommend moving the this.isConnecting = true to before the lock is acquired

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a deeper look at this. Your comments have me realize I need to consider the onDetached/open/close in tandem and I don't think I've done that enough.

await this._negotiateClaim();
if (!options) {
options = this._createReceiverOptions();
}
log.error(
"[%s] Trying to create receiver '%s' with options %O",
connectionId,
this.name,
options
);

this._receiver = await this._context.namespace.connection.createReceiver(options);

this.isConnecting = false;
log.error(
"[%s] Receiver '%s' with address '%s' has established itself.",
connectionId,
this.name,
this.address
);
log[this.receiverType](
"Promise to create the receiver resolved. " + "Created receiver with name: ",
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
this.name
);
log[this.receiverType](
"[%s] Receiver '%s' created with receiver options: %O",
connectionId,
this.name,
options
);
// It is possible for someone to close the receiver and then start it again.
// Thus make sure that the receiver is present in the client cache.
if (this.receiverType === ReceiverType.streaming && !this._context.streamingReceiver) {
this._context.streamingReceiver = this as any;
} else if (
this.receiverType === ReceiverType.batching &&
!this._context.batchingReceiver
) {
this._context.batchingReceiver = this as any;
}
await this._ensureTokenRenewal();
});
} else {
log.error(
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " +
Expand Down Expand Up @@ -1004,19 +1023,22 @@ export class MessageReceiver extends LinkEntity {
*/
async close(): Promise<void> {
this.wasCloseInitiated = true;
log.receiver(
"[%s] Closing the [%s]Receiver for entity '%s'.",
this._context.namespace.connectionId,
this.receiverType,
this._context.entityPath
);
if (this._newMessageReceivedTimer) clearTimeout(this._newMessageReceivedTimer);
this._clearAllMessageLockRenewTimers();
if (this._receiver) {
const receiverLink = this._receiver;
this._deleteFromCache();
await this._closeLink(receiverLink);
}

return defaultLock.acquire(this._openLock, async () => {
log.receiver(
"[%s] Closing the [%s]Receiver for entity '%s'.",
this._context.namespace.connectionId,
this.receiverType,
this._context.entityPath
);
if (this._newMessageReceivedTimer) clearTimeout(this._newMessageReceivedTimer);
this._clearAllMessageLockRenewTimers();
if (this._receiver) {
const receiverLink = this._receiver;
this._deleteFromCache();
await this._closeLink(receiverLink);
}
});
}

/**
Expand Down
Loading