From 840de47594b68a4e2fd7ad93362020eb6231ab2e Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Thu, 2 Jan 2020 16:35:10 -0500 Subject: [PATCH] [Event Hubs Client] Track Two (Test Stability) (#9269) The focus of these changes is to improve test stability, especially for Live tests in the nightly runs. Timing has been adjusted to allow for instability in ARM, which is notoriously cranky when resources are manipulated on-the-fly and to ensure all tests are using timed cancellation to ensure that they do not hang. --- .../Processor/EventProcessorClientTests.cs | 126 ++++++++++++------ .../src/Testing/LiveResourceManager.cs | 4 +- sdk/eventhub/tests.yml | 1 + 3 files changed, 87 insertions(+), 44 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs index e222eae6c291..615bdbcdc0e3 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs @@ -491,7 +491,8 @@ public async Task StartAsyncStartsTheEventProcessorWhenProcessingHandlerProperti mockProcessor.Object.ProcessEventAsync += eventArgs => Task.CompletedTask; mockProcessor.Object.ProcessErrorAsync += eventArgs => Task.CompletedTask; - Assert.That(async () => await mockProcessor.Object.StartProcessingAsync(), Throws.Nothing); + using var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + Assert.That(async () => await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token), Throws.Nothing); await mockProcessor.Object.StopProcessingAsync(); } @@ -616,12 +617,15 @@ public async Task CannotAddHandlerWhileProcessorIsRunning() mockProcessor.Object.ProcessEventAsync += eventArgs => Task.CompletedTask; mockProcessor.Object.ProcessErrorAsync += eventArgs => Task.CompletedTask; - await mockProcessor.Object.StartProcessingAsync(); + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); Assert.That(() => mockProcessor.Object.PartitionInitializingAsync += eventArgs => Task.CompletedTask, Throws.InstanceOf()); Assert.That(() => mockProcessor.Object.PartitionClosingAsync += eventArgs => Task.CompletedTask, Throws.InstanceOf()); - await mockProcessor.Object.StopProcessingAsync(); + await mockProcessor.Object.StopProcessingAsync(cancellationSource.Token); // Once stopped, the processor should allow handlers to be added again. @@ -650,6 +654,10 @@ public async Task CannotRemoveHandlerWhileProcessorIsRunning() It.IsAny())) .Returns(mockConsumer.Object); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + Func initHandler = eventArgs => Task.CompletedTask; Func closeHandler = eventArgs => Task.CompletedTask; Func eventHandler = eventArgs => Task.CompletedTask; @@ -660,14 +668,14 @@ public async Task CannotRemoveHandlerWhileProcessorIsRunning() mockProcessor.Object.ProcessEventAsync += eventHandler; mockProcessor.Object.ProcessErrorAsync += errorHandler; - await mockProcessor.Object.StartProcessingAsync(); + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); Assert.That(() => mockProcessor.Object.PartitionInitializingAsync -= initHandler, Throws.InstanceOf()); Assert.That(() => mockProcessor.Object.PartitionClosingAsync -= closeHandler, Throws.InstanceOf()); Assert.That(() => mockProcessor.Object.ProcessEventAsync -= eventHandler, Throws.InstanceOf()); Assert.That(() => mockProcessor.Object.ProcessErrorAsync -= errorHandler, Throws.InstanceOf()); - await mockProcessor.Object.StopProcessingAsync(); + await mockProcessor.Object.StopProcessingAsync(cancellationSource.Token); // Once stopped, the processor should allow handlers to be removed again. @@ -701,13 +709,16 @@ public async Task IsRunningReturnsTrueWhileStopProcessingAsyncIsNotCalled() mockProcessor.Object.ProcessEventAsync += eventArgs => Task.CompletedTask; mockProcessor.Object.ProcessErrorAsync += eventArgs => Task.CompletedTask; + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + Assert.That(mockProcessor.Object.IsRunning, Is.False); - await mockProcessor.Object.StartProcessingAsync(); + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); Assert.That(mockProcessor.Object.IsRunning, Is.True); - await mockProcessor.Object.StopProcessingAsync(); + await mockProcessor.Object.StopProcessingAsync(cancellationSource.Token); Assert.That(mockProcessor.Object.IsRunning, Is.False); } @@ -787,9 +798,10 @@ public async Task VerifiesEventProcessorLogs() mockProcessor.Object.ProcessEventAsync += eventArgs => Task.CompletedTask; mockProcessor.Object.ProcessErrorAsync += eventArgs => Task.CompletedTask; - Assert.That(async () => await mockProcessor.Object.StartProcessingAsync(), Throws.Nothing); + using var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await mockProcessor.Object.StopProcessingAsync(); + Assert.That(async () => await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token), Throws.Nothing); + await mockProcessor.Object.StopProcessingAsync(cancellationSource.Token); mockLog.Verify(m => m.EventProcessorStart(mockProcessor.Object.Identifier)); mockLog.Verify(m => m.RenewOwnershipStart(mockProcessor.Object.Identifier)); @@ -892,17 +904,20 @@ public async Task ProcessErrorAsyncCanStopTheEventProcessorClient() mockProcessor.Object.ProcessEventAsync += eventArgs => Task.CompletedTask; + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + var completionSource = new TaskCompletionSource(); mockProcessor.Object.ProcessErrorAsync += async eventArgs => { - await mockProcessor.Object.StopProcessingAsync(); + await mockProcessor.Object.StopProcessingAsync(cancellationSource.Token); completionSource.SetResult(true); }; // Start the processor and wait for the event handler to be triggered. - await mockProcessor.Object.StartProcessingAsync(); + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); await completionSource.Task; // Ensure that the processor has been stopped. @@ -956,9 +971,13 @@ public async Task ProcessEventAsyncReceivesAnEmptyPartitionContextForNoData() // Start the processor and wait for the event handler to be triggered. - await mockProcessor.Object.StartProcessingAsync(); + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + + + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); await completionSource.Task; - await mockProcessor.Object.StopProcessingAsync(); + await mockProcessor.Object.StopProcessingAsync(cancellationSource.Token); // Validate the empty event arguments. @@ -992,17 +1011,22 @@ public async Task StoppedClientRelinquishesPartitionOwnershipOtherClientsConside numberOfPartitions: NumberOfPartitions, clientOptions: default); + // Establish timed cancellation to ensure that the test doesn't hang. + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(45)); + // Ownership should start empty. - var completeOwnership = await partitionManager.ListOwnershipAsync(processor1.FullyQualifiedNamespace, processor1.EventHubName, processor1.ConsumerGroup); - Assert.That(completeOwnership.Count(), Is.EqualTo(0)); + var completeOwnership = await partitionManager.ListOwnershipAsync(processor1.FullyQualifiedNamespace, processor1.EventHubName, processor1.ConsumerGroup, cancellationSource.Token); + Assert.That(completeOwnership.Any(), Is.False); // Start the processor so that the processor claims a random partition until none are left. - await processor1.StartProcessingAsync(); + await processor1.StartProcessingAsync(cancellationSource.Token); await processor1.WaitStabilization(); - completeOwnership = await partitionManager.ListOwnershipAsync(processor1.FullyQualifiedNamespace, processor1.EventHubName, processor1.ConsumerGroup); + completeOwnership = await partitionManager.ListOwnershipAsync(processor1.FullyQualifiedNamespace, processor1.EventHubName, processor1.ConsumerGroup, cancellationSource.Token); // All partitions are owned by Processor1. @@ -1010,9 +1034,9 @@ public async Task StoppedClientRelinquishesPartitionOwnershipOtherClientsConside // Stopping the processor should relinquish all partition ownership. - await processor1.StopProcessingAsync(); + await processor1.StopProcessingAsync(cancellationSource.Token); - completeOwnership = await partitionManager.ListOwnershipAsync(processor1.FullyQualifiedNamespace, processor1.EventHubName, processor1.ConsumerGroup); + completeOwnership = await partitionManager.ListOwnershipAsync(processor1.FullyQualifiedNamespace, processor1.EventHubName, processor1.ConsumerGroup, cancellationSource.Token); // No partitions are owned by Processor1. @@ -1021,16 +1045,16 @@ public async Task StoppedClientRelinquishesPartitionOwnershipOtherClientsConside // Start Processor2 so that the processor claims a random partition until none are left. // All partitions should be immediately claimable even though they were just claimed by the Processor1. - await processor2.StartProcessingAsync(); + await processor2.StartProcessingAsync(cancellationSource.Token); await processor2.WaitStabilization(); - completeOwnership = await partitionManager.ListOwnershipAsync(processor1.FullyQualifiedNamespace, processor1.EventHubName, processor1.ConsumerGroup); + completeOwnership = await partitionManager.ListOwnershipAsync(processor1.FullyQualifiedNamespace, processor1.EventHubName, processor1.ConsumerGroup, cancellationSource.Token); // All partitions are owned by Processor2. Assert.That(completeOwnership.Count(p => p.OwnerIdentifier.Equals(processor2.Identifier)), Is.EqualTo(NumberOfPartitions)); - await processor2.StopProcessingAsync(); + await processor2.StopProcessingAsync(cancellationSource.Token); } /// @@ -1050,21 +1074,26 @@ public async Task FindAndClaimOwnershipAsyncClaimsAllClaimablePartitions() numberOfPartitions: NumberOfPartitions, clientOptions: default); + // Establish timed cancellation to ensure that the test doesn't hang. + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + // ownership should start empty. - var completeOwnership = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup); + var completeOwnership = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup, cancellationSource.Token); Assert.That(completeOwnership.Count(), Is.EqualTo(0)); // Start the processor so that the processor claims a random partition until none are left. - await processor.StartProcessingAsync(); + await processor.StartProcessingAsync(cancellationSource.Token); await processor.WaitStabilization(); completeOwnership = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup); Assert.That(completeOwnership.Count(), Is.EqualTo(NumberOfPartitions)); - await processor.StopProcessingAsync(); + await processor.StopProcessingAsync(cancellationSource.Token); } /// @@ -1086,7 +1115,10 @@ public async Task FindAndClaimOwnershipAsyncClaimsPartitionsWhenOwnedEqualsMinim numberOfPartitions: NumberOfPartitions, clientOptions: default); - Console.WriteLine($"Processor1 = {processor.Identifier}"); + // Establish timed cancellation to ensure that the test doesn't hang. + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(45)); // Create partitions owned by this Processor. @@ -1109,7 +1141,7 @@ public async Task FindAndClaimOwnershipAsyncClaimsPartitionsWhenOwnedEqualsMinim // Seed the partitionManager with all partitions. - await partitionManager.ClaimOwnershipAsync(completeOwnership); + await partitionManager.ClaimOwnershipAsync(completeOwnership, cancellationSource.Token); var consumerClient = processor.CreateConsumer(processor.ConsumerGroup, connection, default); @@ -1118,7 +1150,7 @@ public async Task FindAndClaimOwnershipAsyncClaimsPartitionsWhenOwnedEqualsMinim // Get owned partitions. - var totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup); + var totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup, cancellationSource.Token); var ownedByProcessor1 = totalOwnedPartitions.Where(p => p.OwnerIdentifier == processor.Identifier); // Verify owned partitionIds match the owned partitions. @@ -1126,14 +1158,14 @@ public async Task FindAndClaimOwnershipAsyncClaimsPartitionsWhenOwnedEqualsMinim Assert.That(ownedByProcessor1.Count(), Is.EqualTo(MinimumpartitionCount)); Assert.That(ownedByProcessor1.Any(owned => claimablePartitionIds.Contains(owned.PartitionId)), Is.False); - // Start the processor to claim owership from of a Partition even though ownedPartitionCount == MinimumOwnedPartitionsCount. + // Start the processor to claim ownership from of a Partition even though ownedPartitionCount == MinimumOwnedPartitionsCount. - await processor.StartProcessingAsync(); + await processor.StartProcessingAsync(cancellationSource.Token); await processor.WaitStabilization(); // Get owned partitions. - totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup); + totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup, cancellationSource.Token); ownedByProcessor1 = totalOwnedPartitions.Where(p => p.OwnerIdentifier == processor.Identifier); // Verify that we took ownership of the additional partition. @@ -1141,7 +1173,7 @@ public async Task FindAndClaimOwnershipAsyncClaimsPartitionsWhenOwnedEqualsMinim Assert.That(ownedByProcessor1.Count(), Is.GreaterThan(MinimumpartitionCount)); Assert.That(ownedByProcessor1.Any(owned => claimablePartitionIds.Contains(owned.PartitionId)), Is.True); - await processor.StopProcessingAsync(); + await processor.StopProcessingAsync(cancellationSource.Token); } /// @@ -1164,6 +1196,11 @@ public async Task FindAndClaimOwnershipAsyncStealsPartitionsWhenThisProcessorOwn numberOfPartitions: NumberOfPartitions, clientOptions: default); + // Establish timed cancellation to ensure that the test doesn't hang. + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + // Create partitions owned by this Processor. var processor1PartitionIds = Enumerable.Range(1, MinimumpartitionCount); @@ -1185,11 +1222,11 @@ public async Task FindAndClaimOwnershipAsyncStealsPartitionsWhenThisProcessorOwn // Seed the partitionManager with the owned partitions. - await partitionManager.ClaimOwnershipAsync(completeOwnership); + await partitionManager.ClaimOwnershipAsync(completeOwnership, cancellationSource.Token); // Get owned partitions. - var totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup); + var totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup, cancellationSource.Token); var ownedByProcessor1 = totalOwnedPartitions.Where(p => p.OwnerIdentifier == processor.Identifier); var ownedByProcessor3 = totalOwnedPartitions.Where(p => p.OwnerIdentifier == Processor3Id); @@ -1201,9 +1238,9 @@ public async Task FindAndClaimOwnershipAsyncStealsPartitionsWhenThisProcessorOwn Assert.That(ownedByProcessor3.Count(), Is.GreaterThan(MaximumpartitionCount)); - // Start the processor to steal owership from of a when ownedPartitionCount == MinimumOwnedPartitionsCount but a processor owns > MaximumPartitionCount. + // Start the processor to steal ownership from of a when ownedPartitionCount == MinimumOwnedPartitionsCount but a processor owns > MaximumPartitionCount. - await processor.StartProcessingAsync(); + await processor.StartProcessingAsync(cancellationSource.Token); await processor.WaitStabilization(); // Get owned partitions. @@ -1220,7 +1257,7 @@ public async Task FindAndClaimOwnershipAsyncStealsPartitionsWhenThisProcessorOwn Assert.That(ownedByProcessor3.Count(), Is.EqualTo(MaximumpartitionCount)); - await processor.StopProcessingAsync(); + await processor.StopProcessingAsync(cancellationSource.Token); } /// @@ -1243,6 +1280,11 @@ public async Task FindAndClaimOwnershipAsyncStealsPartitionsWhenThisProcessorOwn numberOfPartitions: NumberOfPartitions, clientOptions: default); + // Establish timed cancellation to ensure that the test doesn't hang. + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(30)); + // Create more partitions owned by this Processor. var processor1PartitionIds = Enumerable.Range(1, MinimumpartitionCount - 1); @@ -1268,7 +1310,7 @@ public async Task FindAndClaimOwnershipAsyncStealsPartitionsWhenThisProcessorOwn // Get owned partitions. - var totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup); + var totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup, cancellationSource.Token); var ownedByProcessor1 = totalOwnedPartitions.Where(p => p.OwnerIdentifier == processor.Identifier); var ownedByProcessor3 = totalOwnedPartitions.Where(p => p.OwnerIdentifier == Processor3Id); @@ -1280,14 +1322,14 @@ public async Task FindAndClaimOwnershipAsyncStealsPartitionsWhenThisProcessorOwn Assert.That(ownedByProcessor3.Count(), Is.EqualTo(MaximumpartitionCount)); - // Start the processor to steal owership from of a when ownedPartitionCount == MinimumOwnedPartitionsCount but a processor owns > MaximumPartitionCount. + // Start the processor to steal ownership from of a when ownedPartitionCount == MinimumOwnedPartitionsCount but a processor owns > MaximumPartitionCount. - await processor.StartProcessingAsync(); + await processor.StartProcessingAsync(cancellationSource.Token); await processor.WaitStabilization(); // Get owned partitions. - totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup); + totalOwnedPartitions = await partitionManager.ListOwnershipAsync(processor.FullyQualifiedNamespace, processor.EventHubName, processor.ConsumerGroup, cancellationSource.Token); ownedByProcessor1 = totalOwnedPartitions.Where(p => p.OwnerIdentifier == processor.Identifier); ownedByProcessor3 = totalOwnedPartitions.Where(p => p.OwnerIdentifier == Processor3Id); @@ -1299,7 +1341,7 @@ public async Task FindAndClaimOwnershipAsyncStealsPartitionsWhenThisProcessorOwn Assert.That(ownedByProcessor3.Count(), Is.LessThan(MaximumpartitionCount)); - await processor.StopProcessingAsync(); + await processor.StopProcessingAsync(cancellationSource.Token); } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/LiveResourceManager.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/LiveResourceManager.cs index b214d3947c96..5b5840321531 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/LiveResourceManager.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/LiveResourceManager.cs @@ -28,10 +28,10 @@ public sealed class LiveResourceManager private const int RetryMaximumAttemps = 15; /// The number of seconds to use as the basis for backing off on retry attempts. - private const double RetryExponentialBackoffSeconds = 2.5; + private const double RetryExponentialBackoffSeconds = 3.0; /// The number of seconds to use as the basis for applying jitter to retry back-off calculations. - private const double RetryBaseJitterSeconds = 20.0; + private const double RetryBaseJitterSeconds = 30.0; /// The buffer to apply when considering refreshing; credentials that expire less than this duration will be refreshed. private static readonly TimeSpan CredentialRefreshBuffer = TimeSpan.FromMinutes(5); diff --git a/sdk/eventhub/tests.yml b/sdk/eventhub/tests.yml index 253a2f973cd5..863bbe60c27a 100644 --- a/sdk/eventhub/tests.yml +++ b/sdk/eventhub/tests.yml @@ -12,6 +12,7 @@ jobs: parameters: MaxParallel: 1 ServiceDirectory: eventhub + TimeoutInMinutes: 120 EnvVars: EVENT_HUBS_CLIENT: $(aad-azure-sdk-test-client-id) EVENT_HUBS_SECRET: $(aad-azure-sdk-test-client-secret)