Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Make close() work when a connection is in process #8746

Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6d17e13
Handle the non-session side of making sure that close() will properly…
richardpark-msft May 6, 2020
203a90d
Apply the same open/close logic to MessageSession - if an open is in …
richardpark-msft May 6, 2020
741cb63
Adding in tests for streaming and batching receivers.
richardpark-msft May 7, 2020
0d3198a
Rollback change here - it won't work properly with the way that Sessi…
richardpark-msft May 7, 2020
0d1c8fe
Batch receiver is taken care of.
richardpark-msft May 7, 2020
fb7b198
Fixing several issues that was making the locking work inconsistently…
richardpark-msft May 7, 2020
923fcef
Some more updates:
richardpark-msft May 8, 2020
3330f4a
Whoops, should have committed with the previous commit. This is the "…
richardpark-msft May 8, 2020
5933c14
Consistency
richardpark-msft May 8, 2020
93dc121
Consistency!
richardpark-msft May 8, 2020
65a569e
We don't need a +!
richardpark-msft May 8, 2020
8c93c1f
Merge branch 'richardpark-7986-proper-close' of https://github.com/ri…
richardpark-msft May 8, 2020
ebb33e7
Fix inconsistent name
richardpark-msft May 8, 2020
8fc9bae
Adding in a comment for what the call stack structure (including the …
richardpark-msft May 8, 2020
2f0ca7f
Remove more +'age.
richardpark-msft May 8, 2020
bffee98
- Harsha's question about my test revealed it's lack of disrespect fo…
richardpark-msft May 9, 2020
052f576
Use a normal error with a .retryable field attached to it.
richardpark-msft May 9, 2020
5b63368
Remove throw and just go ahead and treat init() after close() in Mess…
richardpark-msft May 11, 2020
75b2325
StreamingReceiver is no longer a promise (we don't open it immediatel…
richardpark-msft May 11, 2020
f37aa0c
Merge remote-tracking branch 'upstream/hotfix/service-bus-1.1.7' into…
richardpark-msft May 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,13 @@ export class BatchingReceiver extends MessageReceiver {
});
this._init(rcvrOptions)
.then(() => {
this._receiver!.on(ReceiverEvents.receiverDrained, onReceiveDrain);
if (!this._receiver) {
// there's a really small window here where the receiver can be closed
// if that happens we'll just resolve to an empty array of messages.
return resolve([]);
}

this._receiver.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer();
return;
})
Expand Down
140 changes: 88 additions & 52 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import {
RetryOperationType,
RetryConfig,
ConditionErrorNameMapper,
ErrorNameConditionMapper
ErrorNameConditionMapper,
defaultLock
} from "@azure/amqp-common";
import {
Receiver,
Expand All @@ -26,6 +27,7 @@ import { ServiceBusMessage, DispositionType, ReceiveMode } from "../serviceBusMe
import { getUniqueName, calculateRenewAfterDuration } from "../util/utils";
import { MessageHandlerOptions } from "./streamingReceiver";
import { DispositionStatusOptions } from "./managementClient";
import { getReceiverClosedErrorMsg } from "../util/errors";

/**
* @internal
Expand Down Expand Up @@ -242,6 +244,14 @@ export class MessageReceiver extends LinkEntity {
*/
private _isDetaching: boolean = false;

/**
* Used to prevent _init() and close() from accidentally overriding each other.
*
* This also allows close() to block in case an _init() is in progress so it can
* properly shut down.
*/
private _openLock: string = getUniqueName("messageReceiverOpen");

constructor(context: ClientEntityContext, receiverType: ReceiverType, options?: ReceiveOptions) {
super(context.entityPath, context, {
address: context.entityPath,
Expand Down Expand Up @@ -755,7 +765,7 @@ export class MessageReceiver extends LinkEntity {
*
* @returns {Promise<void>} Promise<void>.
*/
protected async _init(options?: ReceiverOptions): Promise<void> {
async _init(options?: ReceiverOptions): Promise<void> {
const connectionId = this._context.namespace.connectionId;
try {
if (!this.isOpen() && !this.isConnecting) {
Expand All @@ -771,44 +781,67 @@ export class MessageReceiver extends LinkEntity {
this.name = options.name;
}

this.isConnecting = true;
await this._negotiateClaim();
if (!options) {
options = this._createReceiverOptions();
}
log.error(
"[%s] Trying to create receiver '%s' with options %O",
connectionId,
this.name,
options
);
await defaultLock.acquire(this._openLock, async () => {
if (this.wasCloseInitiated) {
const err = new MessagingError(
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
getReceiverClosedErrorMsg(
this._context.entityPath,
this._context.clientType,
this._context.isClosed,
undefined
)
);
err.retryable = false;
throw err;
}

this._receiver = await this._context.namespace.connection.createReceiver(options);
this.isConnecting = false;
log.error(
"[%s] Receiver '%s' with address '%s' has established itself.",
connectionId,
this.name,
this.address
);
log[this.receiverType](
"Promise to create the receiver resolved. " + "Created receiver with name: ",
this.name
);
log[this.receiverType](
"[%s] Receiver '%s' created with receiver options: %O",
connectionId,
this.name,
options
);
// It is possible for someone to close the receiver and then start it again.
// Thus make sure that the receiver is present in the client cache.
if (this.receiverType === ReceiverType.streaming && !this._context.streamingReceiver) {
this._context.streamingReceiver = this as any;
} else if (this.receiverType === ReceiverType.batching && !this._context.batchingReceiver) {
this._context.batchingReceiver = this as any;
}
await this._ensureTokenRenewal();
if (this.isOpen()) {
return;
}
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved

this.isConnecting = true;
Copy link
Contributor

@ramya-rao-a ramya-rao-a May 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isConnecting property gets checked in multiple places to make the decision of whether to call onDetached() or not so that there are not multiple attempts being made at recovering the link.

With the change in this PR, isConnecting is now being set to true only after the lock is acquired, resulting in potential multiple calls to onDetached now being a possibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though recently in #8401, @chradek did add a new _isDetaching flag to ensure that onDetached() is a no-op if it gets called multiple times..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But multiple calls to onDetached() will add to the noise in the logs

So, I would recommend moving the this.isConnecting = true to before the lock is acquired

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a deeper look at this. Your comments have me realize I need to consider the onDetached/open/close in tandem and I don't think I've done that enough.

await this._negotiateClaim();
if (!options) {
options = this._createReceiverOptions();
}
log.error(
"[%s] Trying to create receiver '%s' with options %O",
connectionId,
this.name,
options
);

this._receiver = await this._context.namespace.connection.createReceiver(options);

this.isConnecting = false;
log.error(
"[%s] Receiver '%s' with address '%s' has established itself.",
connectionId,
this.name,
this.address
);
log[this.receiverType](
"Promise to create the receiver resolved. Created receiver with name: ",
this.name
);
log[this.receiverType](
"[%s] Receiver '%s' created with receiver options: %O",
connectionId,
this.name,
options
);
// It is possible for someone to close the receiver and then start it again.
// Thus make sure that the receiver is present in the client cache.
if (this.receiverType === ReceiverType.streaming && !this._context.streamingReceiver) {
this._context.streamingReceiver = this as any;
} else if (
this.receiverType === ReceiverType.batching &&
!this._context.batchingReceiver
) {
this._context.batchingReceiver = this as any;
}
await this._ensureTokenRenewal();
});
} else {
log.error(
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " +
Expand Down Expand Up @@ -1004,19 +1037,22 @@ export class MessageReceiver extends LinkEntity {
*/
async close(): Promise<void> {
this.wasCloseInitiated = true;
log.receiver(
"[%s] Closing the [%s]Receiver for entity '%s'.",
this._context.namespace.connectionId,
this.receiverType,
this._context.entityPath
);
if (this._newMessageReceivedTimer) clearTimeout(this._newMessageReceivedTimer);
this._clearAllMessageLockRenewTimers();
if (this._receiver) {
const receiverLink = this._receiver;
this._deleteFromCache();
await this._closeLink(receiverLink);
}

return defaultLock.acquire(this._openLock, async () => {
log.receiver(
"[%s] Closing the [%s]Receiver for entity '%s'.",
this._context.namespace.connectionId,
this.receiverType,
this._context.entityPath
);
if (this._newMessageReceivedTimer) clearTimeout(this._newMessageReceivedTimer);
this._clearAllMessageLockRenewTimers();
if (this._receiver) {
const receiverLink = this._receiver;
this._deleteFromCache();
await this._closeLink(receiverLink);
}
});
}

/**
Expand Down
11 changes: 3 additions & 8 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,11 @@ export class StreamingReceiver extends MessageReceiver {
* @param {ReceiveOptions} [options] Receive options.
* @return {Promise<StreamingReceiver>} A promise that resolves with an instance of StreamingReceiver.
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
*/
static async create(
context: ClientEntityContext,
options?: ReceiveOptions
): Promise<StreamingReceiver> {
static create(context: ClientEntityContext, options?: ReceiveOptions): StreamingReceiver {
throwErrorIfConnectionClosed(context.namespace);
if (!options) options = {};
if (options.autoComplete == null) options.autoComplete = true;
const sReceiver = new StreamingReceiver(context, options);
await sReceiver._init();
context.streamingReceiver = sReceiver;
return sReceiver;

return new StreamingReceiver(context, options);
}
}
29 changes: 22 additions & 7 deletions sdk/servicebus/service-bus/src/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,18 @@ export class Receiver {
throw new TypeError("The parameter 'onError' must be of type 'function'.");
}

StreamingReceiver.create(this._context, {
...options,
receiveMode: this._receiveMode
})
.then(async (sReceiver) => {
if (!this._context.streamingReceiver) {
this._context.streamingReceiver = StreamingReceiver.create(this._context, {
...options,
receiveMode: this._receiveMode
});
}

this._context.streamingReceiver
._init()
.then(async () => {
const sReceiver = this._context.streamingReceiver;

if (!sReceiver) {
return;
}
Expand All @@ -146,6 +153,10 @@ export class Receiver {
return;
})
.catch((err) => {
if (this._context.streamingReceiver != null && !this._context.streamingReceiver.isOpen()) {
this._context.streamingReceiver = undefined;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What scenario does this cover?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the "init() failed and we have a streaming receiver that is not open/doesn't own any resources".

There was a test covering this (Streaming - Failed init should not cache recevier) that revealed this behavior which seems sensible. If a streaming receiver is just completely dead (ie, nothing to clean up) it's safe to just not cache it at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this change is because we are now caching first, initializing later?

A cached streaming receiver symbolizes a receiver that needs to be recovered when there is a connection issue. I am slightly concerned on the implications of this change in order on link recovery.

Take the case of init() failing and a connection issue happening at around the same time.
If the connection recovery parts of the code get executed before the catch block here, then this streaming receiver would be on its merry way to being recovered leading to something like #5541

Can we refactor so that we keep the init first, cache later as before, but still have the changes you want?

const sReceiver = StreamingReceiver.create(..);
sReceiver.init().then(() => {
      if (this.isClosed) {
         await sReceiver.close();
         return;
      };
      this._context.streamingReceiver = sReceiver;
      sReceiver.receive(...)
  }).catch (err) { 
  onError(err); 
  }

Also, related issue and PR from the past: #1730 and #2139


onError(err);
});
}
Expand Down Expand Up @@ -174,7 +185,7 @@ export class Receiver {
this._throwIfReceiverOrConnectionClosed();
this._throwIfAlreadyReceiving();

if (!this._context.batchingReceiver || !this._context.batchingReceiver.isOpen()) {
if (!this._context.batchingReceiver) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is driving this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at it it seemed unnecessary - we can call _init() multiple times on a batchingreceiver and each time will work the same (or early exit if it's already open).

This just made it consistent with the check I was doing for streamingReceiver.

(I can bring it back - I have no strong feelings on it).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On thinking about this more, I think that the way I have it now is just simpler to reason through. We don't have to worry about any overlap/concurrency issues for the field anymore. It's either set or not and any other calls that manipulate it's state are protected by the lock.

If we don't do that then I have to start reasoning about whether it's possible for two concurrent instances of _context.batchingReceiver can be there (ie, we've swapped out an older one for a newer one and we're somehow initializing the older one). So I think this is worth keeping.

const options: ReceiveOptions = {
maxConcurrentCalls: 0,
receiveMode: this._receiveMode
Expand Down Expand Up @@ -432,12 +443,16 @@ export class SessionReceiver {
return;
}
this._context.isSessionEnabled = true;
this._messageSession = await MessageSession.create(this._context, {

this._messageSession = MessageSession.create(this._context, {
sessionId: this._sessionOptions.sessionId,
maxSessionAutoRenewLockDurationInSeconds: this._sessionOptions
.maxSessionAutoRenewLockDurationInSeconds,
receiveMode: this._receiveMode
});

await this._messageSession._init();

// By this point, we should have a valid sessionId on the messageSession
// If not, the receiver cannot be used, so throw error.
if (this._messageSession.sessionId == null) {
Expand Down
Loading