Skip to content

Commit

Permalink
Track publisher confirmations automatically
Browse files Browse the repository at this point in the history
Fixes #1682

* Remove `ConfirmSelectAsync` from `IChannel`
* Add parameters to enable confirmations on `IConnection.CreateChannelAsync`
  • Loading branch information
lukebakken committed Oct 2, 2024
1 parent b2975b0 commit 0b56024
Show file tree
Hide file tree
Showing 29 changed files with 234 additions and 100 deletions.
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static class Constants
/// <summary>
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
/// to set this value for every channel created on a connection,
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
/// and <see cref="IConnection.CreateChannelAsync(bool, bool, ushort?, System.Threading.CancellationToken)" />
/// for setting this value for a particular channel.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;
Expand Down
8 changes: 0 additions & 8 deletions projects/RabbitMQ.Client/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,6 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
Task CloseAsync(ShutdownEventArgs reason, bool abort,
CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously enable publisher confirmations.
/// </summary>
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcksAsync"/> and <see cref="BasicNacksAsync"/> yourself.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
Task ConfirmSelectAsync(bool trackConfirmations = true,
CancellationToken cancellationToken = default);

/// <summary>Asynchronously declare an exchange.</summary>
/// <param name="exchange">The name of the exchange.</param>
/// <param name="type">The type of the exchange.</param>
Expand Down
9 changes: 8 additions & 1 deletion projects/RabbitMQ.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
/// <param name="publisherConfirmations">
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
/// </param>
/// <param name="publisherConfirmationTracking">
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
/// </param>
/// <param name="consumerDispatchConcurrency">
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
Expand All @@ -251,7 +257,8 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// In addition to that consumers need to be thread/concurrency safe.
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
Task<IChannel> CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false,
ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default);
}
}
14 changes: 12 additions & 2 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -251,16 +251,26 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
public async Task<IChannel> CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false,
ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default)
{
EnsureIsOpen();

ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
.ConfigureAwait(false);

AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
if (publisherConfirmations)
{
await channel.ConfirmSelectAsync(trackConfirmations: publisherConfirmationTracking,
cancellationToken: cancellationToken).ConfigureAwait(false);
}

await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);

return channel;
}

Expand Down
23 changes: 20 additions & 3 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,30 @@ await CloseAsync(ea, true,
}
}

public Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
public async Task<IChannel> CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false,
ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default)
{
EnsureIsOpen();
ISession session = CreateSession();
var channel = new Channel(_config, session, consumerDispatchConcurrency);
return channel.OpenAsync(cancellationToken);
IChannel ch = await channel.OpenAsync(cancellationToken)
.ConfigureAwait(false);
if (publisherConfirmations)
{
// TODO yes this is ugly but will be fixed as part of rabbitmq/rabbitmq-dotnet-client#1682
if (ch is not AutorecoveringChannel ac)
{
ChannelBase chb = (ChannelBase)ch;
await chb.ConfirmSelectAsync(publisherConfirmationTracking, cancellationToken)
.ConfigureAwait(false);
}
else
{
await ac.ConfirmSelectAsync(publisherConfirmationTracking, cancellationToken)
.ConfigureAwait(false);
}
}
return ch;
}

internal ISession CreateSession()
Expand Down
2 changes: 0 additions & 2 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -825,10 +825,8 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
RabbitMQ.Client.IConnection.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs!>!
RabbitMQ.Client.IConnection.ConnectionBlockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs!>!
RabbitMQ.Client.IConnection.ConnectionRecoveryErrorAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs!>!
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
RabbitMQ.Client.IConnection.CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false, ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
4 changes: 2 additions & 2 deletions projects/Test/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer

publishTasks.Add(Task.Run(async () =>
{
using IChannel publishChannel = await publishConnection.CreateChannelAsync();
using IChannel publishChannel = await publishConnection.CreateChannelAsync(publisherConfirmations: true,
publisherConfirmationTracking: true);
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
await publishChannel.ConfirmSelectAsync();
for (int i = 0; i < ItemsPerBatch; i++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ static async Task PublishMessagesIndividuallyAsync()
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once");

await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync();
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmations: true,
publisherConfirmationTracking: true);

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
await channel.ConfirmSelectAsync();

var sw = new Stopwatch();
sw.Start();
Expand Down Expand Up @@ -57,7 +57,6 @@ static async Task PublishMessagesInBatchAsync()
// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
await channel.ConfirmSelectAsync();

int batchSize = 100;
int outstandingMessageCount = 0;
Expand Down Expand Up @@ -98,16 +97,15 @@ async Task HandlePublishConfirmsAsynchronously()
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");

await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync();

// NOTE: setting trackConfirmations to false because this program
// is tracking them itself.
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationTracking: false);

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;

// NOTE: setting trackConfirmations to false because this program
// is tracking them itself.
await channel.ConfirmSelectAsync(trackConfirmations: false);

bool publishingCompleted = false;
var allMessagesConfirmedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var outstandingConfirms = new LinkedList<ulong>();
Expand Down
31 changes: 18 additions & 13 deletions projects/Test/Common/TestConnectionRecoveryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
using RabbitMQ.Client.Impl;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -71,17 +72,16 @@ protected async Task AssertConsumerCountAsync(IChannel ch, string q, uint count)
Assert.Equal(count, ok.ConsumerCount);
}

protected async Task AssertExchangeRecoveryAsync(IChannel m, string x)
protected async Task AssertExchangeRecoveryAsync(IChannel ch, string x)
{
await m.ConfirmSelectAsync();
await WithTemporaryNonExclusiveQueueAsync(m, async (_, q) =>
await WithTemporaryNonExclusiveQueueAsync(ch, async (_, q) =>
{
string rk = "routing-key";
await m.QueueBindAsync(q, x, rk);
await m.BasicPublishAsync(x, rk, _messageBody);
await ch.QueueBindAsync(q, x, rk);
await ch.BasicPublishAsync(x, rk, _messageBody);
Assert.True(await WaitForConfirmsWithCancellationAsync(m));
await m.ExchangeDeclarePassiveAsync(x);
Assert.True(await WaitForConfirmsWithCancellationAsync(ch));
await ch.ExchangeDeclarePassiveAsync(x);
});
}

Expand All @@ -92,7 +92,13 @@ protected Task AssertExclusiveQueueRecoveryAsync(IChannel m, string q)

protected async Task AssertQueueRecoveryAsync(IChannel ch, string q, bool exclusive, IDictionary<string, object> arguments = null)
{
await ch.ConfirmSelectAsync();
// TODO
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
await ach.ConfirmSelectAsync(trackConfirmations: true);

// Note: no need to enable publisher confirmations as they are
// automatically enabled for channels
await ch.QueueDeclareAsync(queue: q, passive: true, durable: false, exclusive: false, autoDelete: false, arguments: null);

RabbitMQ.Client.QueueDeclareOk ok1 = await ch.QueueDeclareAsync(queue: q, passive: false,
Expand Down Expand Up @@ -204,9 +210,10 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
{
using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync())
{
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync())
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true))
{
await publishingChannel.ConfirmSelectAsync();
// Note: no need to enable publisher confirmations as they are
// automatically enabled for channels

for (ushort i = 0; i < TotalMessageCount; i++)
{
Expand Down Expand Up @@ -358,10 +365,8 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag,

protected static async Task<bool> SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey)
{
using (IChannel ch = await conn.CreateChannelAsync())
using (IChannel ch = await conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true))
{
await ch.ConfirmSelectAsync();

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var consumer = new AckingBasicConsumer(ch, 1, tcs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ public async Task TestBasicAckAfterBasicGetAndChannelRecovery()
[Fact]
public async Task TestBasicAckEventHandlerRecovery()
{
await _channel.ConfirmSelectAsync();
// TODO
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
AutorecoveringChannel ach = (AutorecoveringChannel)_channel;
await ach.ConfirmSelectAsync(trackConfirmations: false);

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
((AutorecoveringChannel)_channel).BasicAcksAsync += (m, args) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Impl;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -45,6 +46,11 @@ public TestExchangeRecovery(ITestOutputHelper output) : base(output)
[Fact]
public async Task TestExchangeRecoveryTest()
{
// TODO
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
AutorecoveringChannel ach = (AutorecoveringChannel)_channel;
await ach.ConfirmSelectAsync(trackConfirmations: true);

string x = "dotnet-client.test.recovery.x1";
await DeclareNonDurableExchangeAsync(_channel, x);
await CloseAndWaitForRecoveryAsync();
Expand All @@ -55,7 +61,10 @@ public async Task TestExchangeRecoveryTest()
[Fact]
public async Task TestExchangeToExchangeBindingRecovery()
{
await _channel.ConfirmSelectAsync();
// TODO
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
AutorecoveringChannel ach = (AutorecoveringChannel)_channel;
await ach.ConfirmSelectAsync(trackConfirmations: true);

string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName;

Expand Down
16 changes: 9 additions & 7 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
});
return Task.CompletedTask;
};
await using (IChannel publishChannel = await publishConn.CreateChannelAsync())
await using (IChannel publishChannel = await publishConn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true))
{
AddCallbackExceptionHandlers(publishConn, publishChannel);
publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel,
Expand All @@ -226,7 +226,6 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
});
return Task.CompletedTask;
};
await publishChannel.ConfirmSelectAsync();
for (int i = 0; i < publish_total; i++)
{
Expand Down Expand Up @@ -436,6 +435,11 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
[Fact]
public async Task TestBasicAckAsync()
{
// TODO
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
AutorecoveringChannel ach = (AutorecoveringChannel)_channel;
await ach.ConfirmSelectAsync(trackConfirmations: true);

await ValidateConsumerDispatchConcurrency();

string queueName = GenerateQueueName();
Expand Down Expand Up @@ -463,8 +467,6 @@ public async Task TestBasicAckAsync()
return Task.CompletedTask;
};

await _channel.ConfirmSelectAsync();

var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) =>
{
Expand Down Expand Up @@ -649,8 +651,7 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
var consumer1 = new AsyncEventingBasicConsumer(_channel);
consumer1.ReceivedAsync += async (sender, args) =>
{
await using IChannel innerChannel = await _conn.CreateChannelAsync();
await innerChannel.ConfirmSelectAsync();
await using IChannel innerChannel = await _conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true);
await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
mandatory: true,
body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650)));
Expand All @@ -667,7 +668,8 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
};
await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2);

await _channel.ConfirmSelectAsync();
// Note: no need to enable publisher confirmations as they are
// automatically enabled for channels
await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024));
await _channel.WaitForConfirmsOrDieAsync();

Expand Down
Loading

0 comments on commit 0b56024

Please sign in to comment.