Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor: Fixes LeaseLostException on Notifications API for Renewer #4276

Merged
merged 3 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,17 @@ private async Task ProcessPartitionAsync(DocumentServiceLease lease)
{
DefaultTrace.TraceVerbose("Lease with token {0}: processing canceled", lease.CurrentLeaseToken);
}
catch (LeaseLostException leaseLostException)
{
// LeaseLostException by itself is not loggable, unless it contains a related inner exception
ealsur marked this conversation as resolved.
Show resolved Hide resolved
// For cases when the lease or container has been deleted or the lease has been stolen
if (leaseLostException.InnerException != null)
{
await this.monitor.NotifyErrorAsync(lease.CurrentLeaseToken, leaseLostException.InnerException);
}

DefaultTrace.TraceVerbose("Lease with token {0}: lease was lost", lease.CurrentLeaseToken);
}
catch (Exception ex)
{
await this.monitor.NotifyErrorAsync(lease.CurrentLeaseToken, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public async Task Controller_ShouldNotify_IfProcessingFails()
manualResetEvent.Set();
throw exception;
});

Mock.Get(this.partitionSupervisorFactory)
.Setup(f => f.Create(this.lease))
.Returns(supervisor.Object);
Expand Down Expand Up @@ -532,6 +532,89 @@ public async Task Shutdown_ShouldNotify_Monitor()
.Verify(m => m.NotifyLeaseReleaseAsync(this.lease.CurrentLeaseToken), Times.Once);
}

[TestMethod]
public async Task Renewer_ShouldNotNotify_IfLeaseAcquireFailsWithLeaseLost_WithoutInnerException()
{
Mock.Get(this.partitionProcessor)
.Reset();

Mock<PartitionSupervisor> supervisor = new Mock<PartitionSupervisor>();

LeaseLostException exception = new LeaseLostException();

ManualResetEvent manualResetEvent = new ManualResetEvent(false);

supervisor
.Setup(s => s.RunAsync(It.IsAny<CancellationToken>()))
.ThrowsAsync(exception)
.Callback((CancellationToken ct) =>
{
manualResetEvent.Set();
throw exception;
});

Mock.Get(this.partitionSupervisorFactory)
.Setup(f => f.Create(this.lease))
.Returns(supervisor.Object);

await this.sut.AddOrUpdateLeaseAsync(this.lease);

bool timeout = manualResetEvent.WaitOne(100);
Assert.IsTrue(timeout, "Partition supervisor not started");

this.healthMonitor
.Verify(m => m.NotifyErrorAsync(this.lease.CurrentLeaseToken, exception), Times.Never);

Mock.Get(this.leaseManager)
.Verify(manager => manager.ReleaseAsync(this.lease), Times.Once);

this.healthMonitor
.Verify(m => m.NotifyLeaseReleaseAsync(this.lease.CurrentLeaseToken), Times.Once);
}

[TestMethod]
public async Task Renewer_ShouldNotify_IfLeaseAcquireFailsWithLeaseLost_WithInnerException()
{
Mock.Get(this.partitionProcessor)
.Reset();

Mock<PartitionSupervisor> supervisor = new Mock<PartitionSupervisor>();

Exception internalException = new Exception();

LeaseLostException exception = new LeaseLostException("some error", internalException);

ManualResetEvent manualResetEvent = new ManualResetEvent(false);

supervisor
.Setup(s => s.RunAsync(It.IsAny<CancellationToken>()))
.Callback((CancellationToken ct) =>
{
manualResetEvent.Set();
throw exception;
});

Mock.Get(this.partitionSupervisorFactory)
.Setup(f => f.Create(this.lease))
.Returns(supervisor.Object);

await this.sut.AddOrUpdateLeaseAsync(this.lease).ConfigureAwait(false);

bool timeout = manualResetEvent.WaitOne(100);
Assert.IsTrue(timeout, "Partition supervisor not started");

this.healthMonitor
.Verify(m => m.NotifyErrorAsync(this.lease.CurrentLeaseToken, internalException), Times.Once);

Mock.Get(this.leaseManager)
.Verify(manager => manager.ReleaseAsync(this.lease), Times.Once);

this.healthMonitor
.Verify(m => m.NotifyLeaseReleaseAsync(this.lease.CurrentLeaseToken), Times.Once);
}



public Task InitializeAsync()
{
return Task.FromResult(false);
Expand Down