Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Prefetch Size In Bytes Option #15141

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Release History

## 5.3.0-beta.1 (2020-09-15)

### Changes

#### New Features

- Introduction of an option for the various event consumers allowing the prefetch cache to be filled based on a size-based heuristic rather than a count of events. This feature is considered a special case, helpful in scenarios where the size of events being read is not able to be known or predicted upfront and limiting resource use is valued over consistent and predictable performance.

## 5.2.0 (2020-09-08)

### Acknowledgments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public EventProcessorClientOptions() { }
public Azure.Messaging.EventHubs.Processor.LoadBalancingStrategy LoadBalancingStrategy { get { throw null; } set { } }
public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This library extends its Event Processor with durable storage for checkpoint information using Azure Blob storage. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
<Version>5.2.0</Version>
<ApiCompatVersion>5.1.0</ApiCompatVersion>
<Version>5.3.0-beta.1</Version>
<ApiCompatVersion>5.2.0</ApiCompatVersion>
<PackageTags>Azure;Event Hubs;EventHubs;.NET;Event Processor;EventProcessor;$(PackageCommonTags)</PackageTags>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
<Folder Include="Properties\" />
</ItemGroup>

<ItemGroup>
<!-- Version overrides are specific to the v5.2.0 release from the branch; this will be removed and the package properties updated when merged to master -->
<PackageReference Include="Azure.Messaging.EventHubs" VersionOverride="5.2.0" />
<PackageReference Include="Azure.Storage.Blobs" VersionOverride="12.6.0"/>
<!-- END v5.2.0 SPECIFIC OVERRIDES -->
<!-- Version overrides are specific to the v5.3.0-beta.1 release from the branch; this will be removed and the package properties updated when merged to master -->
<ProjectReference Include="$(MSBuildThisFileDirectory)..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" /><!-- Needed until core v5.3.0-beta.1 package is published -->
<!--PackageReference Include="Azure.Messaging.EventHubs" VersionOverride="5.2.0" /-->

<PackageReference Include="Azure.Storage.Blobs" VersionOverride="12.6.0"/>
<PackageReference Include="Microsoft.Azure.Amqp" VersionOverride="2.4.6"/>
<!-- END v5.3.0-beta.1 SPECIFIC OVERRIDES -->

<PackageReference Include="Microsoft.Azure.Amqp" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
<PackageReference Include="System.Reflection.TypeExtensions" />
Expand Down Expand Up @@ -60,7 +63,7 @@
<!-- <Import Project="$(MSBuildThisFileDirectory)..\..\..\core\Azure.Core\src\Azure.Core.props" /> -->

<!--
THIS SECTION IS SPECIFIC TO THE v5.2.0 RELEASE
THIS SECTION IS SPECIFIC TO THE v5.3.0-beta.1 RELEASE

These are normally set via the above reference to Azure.Core.props, but have been included here
to allow a specific override to the Azure.Core version for Event Hubs only; when merged back to
Expand All @@ -81,5 +84,5 @@
<ItemGroup Condition="'$(Nullable)'=='enable' Or '$(UseAzureCoreNullableAttributes)'=='true'">
<Compile Include="$(AzureCoreSharedSources)NullableAttributes.cs" />
</ItemGroup>
<!-- END v5.2.0 SPECIFIC OVERRIDES -->
<!-- END v5.3.0-beta.1 SPECIFIC OVERRIDES -->
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,8 @@ private static EventProcessorOptions CreateOptions(EventProcessorClientOptions c
MaximumWaitTime = clientOptions.MaximumWaitTime,
TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties,
LoadBalancingStrategy = clientOptions.LoadBalancingStrategy,
PrefetchCount = clientOptions.PrefetchCount
PrefetchCount = clientOptions.PrefetchCount,
PrefetchSizeInBytes = clientOptions.PrefetchSizeInBytes
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class EventProcessorClientOptions
/// <summary>The prefetch count to use when reading events.</summary>
private int _prefetchCount = 300;

/// <summary>The prefetch size limit to use for the partition receiver.</summary>
private long? _prefetchSizeInBytes = default;

/// <summary>The set of options to use for configuring the connection to the Event Hubs service.</summary>
private EventHubConnectionOptions _connectionOptions = new EventHubConnectionOptions();

Expand Down Expand Up @@ -135,9 +138,9 @@ public int CacheEventCount
}

/// <summary>
/// The number of events that will be eagerly requested from the Event Hubs service and staged locally without regard to
/// whether a reader is currently active, intended to help maximize throughput by buffering service operations rather than
/// readers needing to wait for service operations to complete.
/// The number of events that will be eagerly requested from the Event Hubs service and queued locally without regard to
/// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from
/// from a local cache rather than waiting on a service request.
/// </summary>
///
/// <value>
Expand All @@ -147,8 +150,8 @@ public int CacheEventCount
/// </value>
///
/// <remarks>
/// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service. The
/// larger the size of the cache, the more efficiently service operations can be buffered in the background to
/// The size of the prefetch count has an influence on the efficiency of reading events from the Event Hubs service.
/// The larger the size of the cache, the more efficiently service operations can be buffered in the background to
/// improve throughput. This comes at the cost of additional memory use and potentially increases network I/O.
///
/// For scenarios where the size of events is small and many events are flowing through the system, using a larger
Expand All @@ -172,6 +175,38 @@ public int PrefetchCount
}
}

/// <summary>
/// The desired number of bytes to attempt to eagerly request from the Event Hubs service and queued locally without regard to
/// whether a read operation is currently active, intended to help maximize throughput by allowing events to be read from
/// from a local cache rather than waiting on a service request.
/// </summary>
///
/// <value>
/// <para>When set to <c>null</c>, the option is considered disabled; otherwise, it will be considered enabled and take
/// precedence over any value specified for the <see cref="PrefetchCount" />The <see cref="PrefetchSizeInBytes" /> is an
/// advanced control that developers can use to help tune performance in some scenarios; it is recommended to prefer using
/// the <see cref="PrefetchCount" /> over this option where possible for more accurate control and more predictable throughput.</para>
///
/// <para>This size should be considered a statement of intent rather than a guaranteed limit; the local cache may be larger or
/// smaller than the number of bytes specified, and will always contain at least one event when the <see cref="PrefetchSizeInBytes" />
/// is specified. A heuristic is used to predict the average event size to use for size calculations, which should be expected to fluctuate
/// as traffic passes through the system. Consequently, the resulting resource use will fluctuate as well.</para>
/// </value>
///
public long? PrefetchSizeInBytes
{
get => _prefetchSizeInBytes;

set
{
if (value.HasValue)
{
Argument.AssertAtLeast(value.Value, 0, nameof(PrefetchSizeInBytes));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 0 a valid value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oddly enough, yes. That will have the net effect of "always prefetch exactly one event no matter the size",

}
_prefetchSizeInBytes = value;
}
}

/// <summary>
/// Gets or sets the options used for configuring the connection to the Event Hubs service.
/// </summary>
Expand Down Expand Up @@ -246,6 +281,7 @@ internal EventProcessorClientOptions Clone() =>
_maximumWaitTime = _maximumWaitTime,
_cacheEventCount = _cacheEventCount,
_prefetchCount = _prefetchCount,
_prefetchSizeInBytes = PrefetchSizeInBytes,
_connectionOptions = ConnectionOptions.Clone(),
_retryOptions = RetryOptions.Clone()
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void CloneProducesACopy()
MaximumWaitTime = TimeSpan.FromMinutes(65),
CacheEventCount = 1,
PrefetchCount = 0,
PrefetchSizeInBytes = 200,
RetryOptions = new EventHubsRetryOptions { TryTimeout = TimeSpan.FromMinutes(1), Delay = TimeSpan.FromMinutes(4) },
ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets }
};
Expand All @@ -46,6 +47,7 @@ public void CloneProducesACopy()
Assert.That(clone.MaximumWaitTime, Is.EqualTo(options.MaximumWaitTime), "The maximum wait time of the clone should match.");
Assert.That(clone.CacheEventCount, Is.EqualTo(options.CacheEventCount), "The event cache size of the clone should match.");
Assert.That(clone.PrefetchCount, Is.EqualTo(options.PrefetchCount), "The prefetch count of the clone should match.");
Assert.That(clone.PrefetchSizeInBytes, Is.EqualTo(options.PrefetchSizeInBytes), "The prefetch byte size of the clone should match.");
Assert.That(clone.ConnectionOptions.TransportType, Is.EqualTo(options.ConnectionOptions.TransportType), "The connection options of the clone should copy properties.");
Assert.That(clone.ConnectionOptions, Is.Not.SameAs(options.ConnectionOptions), "The connection options of the clone should be a copy, not the same instance.");
Assert.That(clone.RetryOptions.IsEquivalentTo(options.RetryOptions), Is.True, "The retry options of the clone should be considered equal.");
Expand Down Expand Up @@ -113,6 +115,39 @@ public void PrefetchCountAllowsZero()
Assert.That(() => new EventProcessorClientOptions { PrefetchCount = 0 }, Throws.Nothing);
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.PrefetchSizeInBytes" />
/// property.
/// </summary>
///
[Test]
public void PrefetchSizeInBytesIsValidated()
{
Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = -1 }, Throws.InstanceOf<ArgumentException>());
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.PrefetchSizeInBytes" />
/// property.
/// </summary>
///
[Test]
public void PrefetchSizeInBytesAllowsZero()
{
Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = 0 }, Throws.Nothing);
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.PrefetchSizeInBytes" />
/// property.
/// </summary>
///
[Test]
public void PrefetchSizeInBytesAllowsNull()
{
Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = null }, Throws.Nothing);
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.ConnectionOptions" />
/// property.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ void assertOptionsMatch(EventProcessorOptions expected,
Assert.That(actual.LoadBalancingUpdateInterval, Is.EqualTo(expected.LoadBalancingUpdateInterval), $"The load balancing interval is incorrect for the { constructorDescription } constructor.");
Assert.That(actual.PartitionOwnershipExpirationInterval, Is.EqualTo(expected.PartitionOwnershipExpirationInterval), $"The ownership expiration interval incorrect for the { constructorDescription } constructor.");
Assert.That(actual.PrefetchCount, Is.EqualTo(expected.PrefetchCount), $"The prefetch count is incorrect for the { constructorDescription } constructor.");
Assert.That(actual.PrefetchSizeInBytes, Is.EqualTo(expected.PrefetchSizeInBytes), $"The prefetch byte size is incorrect for the { constructorDescription } constructor.");
}

var clientOptions = new EventProcessorClientOptions
Expand All @@ -131,7 +132,9 @@ void assertOptionsMatch(EventProcessorOptions expected,
RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 },
Identifier = "OMG, HAI!",
MaximumWaitTime = TimeSpan.FromDays(54),
TrackLastEnqueuedEventProperties = true
TrackLastEnqueuedEventProperties = true,
PrefetchCount = 5,
PrefetchSizeInBytes = 500
};

var expectedOptions = InvokeCreateOptions(clientOptions);
Expand Down Expand Up @@ -1408,7 +1411,8 @@ public void ClientOptionsCanBeTranslated()
MaximumWaitTime = TimeSpan.FromDays(54),
TrackLastEnqueuedEventProperties = true,
LoadBalancingStrategy = LoadBalancingStrategy.Greedy,
PrefetchCount = 9990
PrefetchCount = 9990,
PrefetchSizeInBytes = 400
};

var defaultOptions = new EventProcessorOptions();
Expand All @@ -1424,6 +1428,7 @@ public void ClientOptionsCanBeTranslated()
Assert.That(processorOptions.TrackLastEnqueuedEventProperties, Is.EqualTo(clientOptions.TrackLastEnqueuedEventProperties), "The flag for last event tracking should have been set.");
Assert.That(processorOptions.LoadBalancingStrategy, Is.EqualTo(clientOptions.LoadBalancingStrategy), "The load balancing strategy should have been set.");
Assert.That(processorOptions.PrefetchCount, Is.EqualTo(clientOptions.PrefetchCount), "The prefetch count should have been set.");
Assert.That(processorOptions.PrefetchSizeInBytes, Is.EqualTo(clientOptions.PrefetchSizeInBytes), "The prefetch byte size should have been set.");

Assert.That(processorOptions.DefaultStartingPosition, Is.EqualTo(defaultOptions.DefaultStartingPosition), "The default starting position should not have been set.");
Assert.That(processorOptions.LoadBalancingUpdateInterval, Is.EqualTo(defaultOptions.LoadBalancingUpdateInterval), "The load balancing interval should not have been set.");
Expand Down
8 changes: 7 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Release History

## 5.3.0-beta.1 (Unreleased)
## 5.3.0-beta.1 (2020-09-15)

### Changes

#### New Features

- Introduction of an option for the various event consumers allowing the prefetch cache to be filled based on a size-based heuristic rather than a count of events. This feature is considered a special case, helpful in scenarios where the size of events being read is not able to be known or predicted upfront and limiting resource use is valued over consistent and predictable performance.

## 5.2.0 (2020-09-08)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public ReadEventOptions() { }
public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } }
public long? OwnerLevel { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
Expand Down Expand Up @@ -265,6 +266,7 @@ public EventProcessorOptions() { }
public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } }
public System.TimeSpan PartitionOwnershipExpirationInterval { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down Expand Up @@ -355,6 +357,7 @@ public PartitionReceiverOptions() { }
public System.TimeSpan? DefaultMaximumReceiveWaitTime { get { throw null; } set { } }
public long? OwnerLevel { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down
5 changes: 4 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ public override TransportProducer CreateProducer(string partitionId,
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
/// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this value should be <c>null</c>.</param>
/// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whether an operation was requested. If <c>null</c> a default will be used.</param>
/// <param name="prefetchSizeInBytes">The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size.</param>
///
/// <returns>A <see cref="TransportConsumer" /> configured in the requested manner.</returns>
///
Expand All @@ -426,7 +427,8 @@ public override TransportConsumer CreateConsumer(string consumerGroup,
EventHubsRetryPolicy retryPolicy,
bool trackLastEnqueuedEventProperties,
long? ownerLevel,
uint? prefetchCount)
uint? prefetchCount,
long? prefetchSizeInBytes)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));

Expand All @@ -439,6 +441,7 @@ public override TransportConsumer CreateConsumer(string consumerGroup,
trackLastEnqueuedEventProperties,
ownerLevel,
prefetchCount,
prefetchSizeInBytes,
ConnectionScope,
MessageConverter,
retryPolicy
Expand Down
Loading