Skip to content

Commit

Permalink
[Event Hubs Client] Formatting Pass (#18356)
Browse files Browse the repository at this point in the history
The purpose of these changes is to format code to apply project conventions
for consistency and in some cases, update member names and documentation to
better convey context.

**Note:** These changes are intended to be superficial; no change to existing
behavior should have taken place.
  • Loading branch information
jsquire authored Feb 2, 2021
1 parent 061c377 commit 7cf8349
Show file tree
Hide file tree
Showing 34 changed files with 335 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,10 @@ public virtual void UpdateCheckpointStart(string partitionId,
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
///
[Event(33, Level = EventLevel.Informational, Message = "Completed the attempt to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'.")]
public virtual void UpdateCheckpointComplete(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup)
public virtual void UpdateCheckpointComplete(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup)
{
if (IsEnabled())
{
Expand All @@ -363,11 +363,11 @@ public virtual void UpdateCheckpointComplete(string partitionId,
/// <param name="errorMessage">The message for the exception that occurred.</param>
///
[Event(34, Level = EventLevel.Error, Message = "An exception occurred when creating/updating a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'. ErrorMessage: '{4}'.")]
public virtual void UpdateCheckpointError(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string errorMessage)
public virtual void UpdateCheckpointError(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string errorMessage)
{
if (IsEnabled())
{
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ public async Task GetCheckpointLogsStartAndComplete()
await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken());

mockLog.Verify(m => m.GetCheckpointStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0"));
mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0"));
mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0"));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Azure.Messaging.EventHubs.Processor.Tests
/// being run, offering access to information such as environment variables.
/// </summary>
///
public class StorageTestEnvironment: TestEnvironment
public class StorageTestEnvironment : TestEnvironment
{
/// <summary>The singleton instance of the <see cref="StorageTestEnvironment" />, lazily created.</summary>
private static readonly Lazy<StorageTestEnvironment> Singleton = new Lazy<StorageTestEnvironment>(() => new StorageTestEnvironment(), LazyThreadSafetyMode.ExecutionAndPublication);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async Task EventsCanBeReadByOneProcessorClient(LoadBalancingStrategy load
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a set of events.

Expand Down Expand Up @@ -79,7 +79,7 @@ public async Task EventsCanBeReadByOneProcessorClient(LoadBalancingStrategy load
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand All @@ -97,7 +97,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingAnIdentityCredential()
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a set of events.

Expand Down Expand Up @@ -129,7 +129,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingAnIdentityCredential()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand All @@ -148,7 +148,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingTheSharedKeyCredential
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

// Send a set of events.

Expand Down Expand Up @@ -180,7 +180,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingTheSharedKeyCredential
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ public async Task EventsCanBeReadByMultipleProcessorClients()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand Down Expand Up @@ -288,7 +288,7 @@ public async Task ProcessorClientCreatesOwnership()

var processedEvents = new ConcurrentDictionary<string, EventData>();
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var storageManager = new InMemoryStorageManager(_ => {});
var storageManager = new InMemoryStorageManager(_ => { });
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(250) };
var processor = CreateProcessorWithIdentity(scope.ConsumerGroups.First(), scope.EventHubName, storageManager, options);

Expand Down Expand Up @@ -346,7 +346,7 @@ public async Task ProcessorClientCanStartFromAnInitialPosition()

await using (var consumer = new EventHubConsumerClient(scope.ConsumerGroups.First(), connectionString))
{
await foreach (var partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = null }, cancellationSource.Token))
await foreach (var partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = null }, cancellationSource.Token))
{
if (partitionEvent.Data.IsEquivalentTo(lastSourceEvent))
{
Expand Down Expand Up @@ -393,7 +393,7 @@ public async Task ProcessorClientCanStartFromAnInitialPosition()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand Down Expand Up @@ -438,7 +438,7 @@ public async Task ProcessorClientBeginsWithTheNextEventAfterCheckpointing()
var completionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var beforeCheckpointProcessHandler = CreateEventTrackingHandler(segmentEventCount, processedEvents, completionSource, cancellationSource.Token, processedEventCallback);
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(250) };
var storageManager = new InMemoryStorageManager(_ => {});
var storageManager = new InMemoryStorageManager(_ => { });
var processor = CreateProcessor(scope.ConsumerGroups.First(), connectionString, storageManager, options);

processor.ProcessErrorAsync += CreateAssertingErrorHandler();
Expand Down Expand Up @@ -476,7 +476,7 @@ public async Task ProcessorClientBeginsWithTheNextEventAfterCheckpointing()
foreach (var sourceEvent in afterCheckpointEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
Expand All @@ -500,7 +500,7 @@ private EventProcessorClient CreateProcessor(string consumerGroup,
{
EventHubConnection createConnection() => new EventHubConnection(connectionString);

storageManager ??= new InMemoryStorageManager(_=> {});
storageManager ??= new InMemoryStorageManager(_ => { });
return new TestEventProcessorClient(storageManager, consumerGroup, "fakeNamespace", "fakeEventHub", Mock.Of<TokenCredential>(), createConnection, options);
}

Expand All @@ -524,7 +524,7 @@ private EventProcessorClient CreateProcessorWithIdentity(string consumerGroup,
var credential = EventHubsTestEnvironment.Instance.Credential;
EventHubConnection createConnection() => new EventHubConnection(EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential);

storageManager ??= new InMemoryStorageManager(_=> {});
storageManager ??= new InMemoryStorageManager(_ => { });
return new TestEventProcessorClient(storageManager, consumerGroup, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential, createConnection, options);
}

Expand All @@ -548,7 +548,7 @@ private EventProcessorClient CreateProcessorWithSharedAccessKey(string consumerG
var credential = new EventHubsSharedAccessKeyCredential(EventHubsTestEnvironment.Instance.SharedAccessKeyName, EventHubsTestEnvironment.Instance.SharedAccessKey);
EventHubConnection createConnection() => null; //new EventHubConnection(EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential);

storageManager ??= new InMemoryStorageManager(_=> {});
storageManager ??= new InMemoryStorageManager(_ => { });
return new TestEventProcessorClient(storageManager, consumerGroup, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential, createConnection, options);
}

Expand Down Expand Up @@ -621,7 +621,7 @@ private Func<ProcessEventArgs, Task> CreateEventTrackingHandler(int targetCount,
if (processedEvents.Count >= targetCount)
{
completionSource.TrySetResult(true);
completionSource.TrySetResult(true);
}
}
}
Expand Down
Loading

0 comments on commit 7cf8349

Please sign in to comment.