diff --git a/projects/RabbitMQ.Client/Constants.cs b/projects/RabbitMQ.Client/Constants.cs index 637d53a78..e456d0da9 100644 --- a/projects/RabbitMQ.Client/Constants.cs +++ b/projects/RabbitMQ.Client/Constants.cs @@ -87,7 +87,7 @@ public static class Constants /// /// The default consumer dispatch concurrency. See /// to set this value for every channel created on a connection, - /// and + /// and /// for setting this value for a particular channel. /// public const ushort DefaultConsumerDispatchConcurrency = 1; diff --git a/projects/RabbitMQ.Client/IChannel.cs b/projects/RabbitMQ.Client/IChannel.cs index 63fc72756..0067712c2 100644 --- a/projects/RabbitMQ.Client/IChannel.cs +++ b/projects/RabbitMQ.Client/IChannel.cs @@ -265,14 +265,6 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort, Task CloseAsync(ShutdownEventArgs reason, bool abort, CancellationToken cancellationToken = default); - /// - /// Asynchronously enable publisher confirmations. - /// - /// Set to false if tracking via and yourself. - /// CancellationToken for this operation. - Task ConfirmSelectAsync(bool trackConfirmations = true, - CancellationToken cancellationToken = default); - /// Asynchronously declare an exchange. /// The name of the exchange. /// The type of the exchange. diff --git a/projects/RabbitMQ.Client/IConnection.cs b/projects/RabbitMQ.Client/IConnection.cs index c562307a7..e969d9c13 100644 --- a/projects/RabbitMQ.Client/IConnection.cs +++ b/projects/RabbitMQ.Client/IConnection.cs @@ -240,6 +240,12 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo /// /// Asynchronously create and return a fresh channel, session, and channel. /// + /// + /// Enable or disable publisher confirmations on this channel. Defaults to false + /// + /// + /// Should this library track publisher confirmations for you? Defaults to false + /// /// /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one /// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. @@ -251,7 +257,8 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo /// In addition to that consumers need to be thread/concurrency safe. /// /// Cancellation token - Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, + Task CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false, + ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default); } } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index dc99e33fc..326a21bb5 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -251,16 +251,26 @@ await CloseInnerConnectionAsync() } } - public async Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, - CancellationToken cancellationToken = default) + public async Task CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false, + ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default) { EnsureIsOpen(); + ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency); + RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken) .ConfigureAwait(false); + AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc); + if (publisherConfirmations) + { + await channel.ConfirmSelectAsync(trackConfirmations: publisherConfirmationTracking, + cancellationToken: cancellationToken).ConfigureAwait(false); + } + await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); + return channel; } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 4a733ac8c..eb8769d14 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -264,13 +264,30 @@ await CloseAsync(ea, true, } } - public Task CreateChannelAsync(ushort? consumerDispatchConcurrency = null, - CancellationToken cancellationToken = default) + public async Task CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false, + ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default) { EnsureIsOpen(); ISession session = CreateSession(); var channel = new Channel(_config, session, consumerDispatchConcurrency); - return channel.OpenAsync(cancellationToken); + IChannel ch = await channel.OpenAsync(cancellationToken) + .ConfigureAwait(false); + if (publisherConfirmations) + { + // TODO yes this is ugly but will be fixed as part of rabbitmq/rabbitmq-dotnet-client#1682 + if (ch is not AutorecoveringChannel ac) + { + ChannelBase chb = (ChannelBase)ch; + await chb.ConfirmSelectAsync(publisherConfirmationTracking, cancellationToken) + .ConfigureAwait(false); + } + else + { + await ac.ConfirmSelectAsync(publisherConfirmationTracking, cancellationToken) + .ConfigureAwait(false); + } + } + return ch; } internal ISession CreateSession() diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 9b4e290f2..7222fd988 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -825,10 +825,8 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort -RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! RabbitMQ.Client.IConnection.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.ConnectionBlockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.ConnectionRecoveryErrorAsync -> RabbitMQ.Client.Events.AsyncEventHandler! diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e69de29bb..95e3d1941 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -0,0 +1 @@ +RabbitMQ.Client.IConnection.CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false, ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! \ No newline at end of file diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index 2d6ef5e63..e666fe4b1 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -137,10 +137,10 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer publishTasks.Add(Task.Run(async () => { - using IChannel publishChannel = await publishConnection.CreateChannelAsync(); + using IChannel publishChannel = await publishConnection.CreateChannelAsync(publisherConfirmations: true, + publisherConfirmationTracking: true); publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; - await publishChannel.ConfirmSelectAsync(); for (int i = 0; i < ItemsPerBatch; i++) { diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index 16b597459..39b6fb8cd 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -24,12 +24,12 @@ static async Task PublishMessagesIndividuallyAsync() Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once"); await using IConnection connection = await CreateConnectionAsync(); - await using IChannel channel = await connection.CreateChannelAsync(); + await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmations: true, + publisherConfirmationTracking: true); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName; - await channel.ConfirmSelectAsync(); var sw = new Stopwatch(); sw.Start(); @@ -57,7 +57,6 @@ static async Task PublishMessagesInBatchAsync() // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName; - await channel.ConfirmSelectAsync(); int batchSize = 100; int outstandingMessageCount = 0; @@ -98,16 +97,15 @@ async Task HandlePublishConfirmsAsynchronously() Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously"); await using IConnection connection = await CreateConnectionAsync(); - await using IChannel channel = await connection.CreateChannelAsync(); + + // NOTE: setting trackConfirmations to false because this program + // is tracking them itself. + await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationTracking: false); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName; - // NOTE: setting trackConfirmations to false because this program - // is tracking them itself. - await channel.ConfirmSelectAsync(trackConfirmations: false); - bool publishingCompleted = false; var allMessagesConfirmedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var outstandingConfirms = new LinkedList(); diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index 8b2e7edc3..1ac46fc6d 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -35,6 +35,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Framing; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -71,17 +72,16 @@ protected async Task AssertConsumerCountAsync(IChannel ch, string q, uint count) Assert.Equal(count, ok.ConsumerCount); } - protected async Task AssertExchangeRecoveryAsync(IChannel m, string x) + protected async Task AssertExchangeRecoveryAsync(IChannel ch, string x) { - await m.ConfirmSelectAsync(); - await WithTemporaryNonExclusiveQueueAsync(m, async (_, q) => + await WithTemporaryNonExclusiveQueueAsync(ch, async (_, q) => { string rk = "routing-key"; - await m.QueueBindAsync(q, x, rk); - await m.BasicPublishAsync(x, rk, _messageBody); + await ch.QueueBindAsync(q, x, rk); + await ch.BasicPublishAsync(x, rk, _messageBody); - Assert.True(await WaitForConfirmsWithCancellationAsync(m)); - await m.ExchangeDeclarePassiveAsync(x); + Assert.True(await WaitForConfirmsWithCancellationAsync(ch)); + await ch.ExchangeDeclarePassiveAsync(x); }); } @@ -92,7 +92,13 @@ protected Task AssertExclusiveQueueRecoveryAsync(IChannel m, string q) protected async Task AssertQueueRecoveryAsync(IChannel ch, string q, bool exclusive, IDictionary arguments = null) { - await ch.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)ch; + await ach.ConfirmSelectAsync(trackConfirmations: true); + + // Note: no need to enable publisher confirmations as they are + // automatically enabled for channels await ch.QueueDeclareAsync(queue: q, passive: true, durable: false, exclusive: false, autoDelete: false, arguments: null); RabbitMQ.Client.QueueDeclareOk ok1 = await ch.QueueDeclareAsync(queue: q, passive: false, @@ -204,9 +210,10 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) { using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync()) { - using (IChannel publishingChannel = await publishingConn.CreateChannelAsync()) + using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true)) { - await publishingChannel.ConfirmSelectAsync(); + // Note: no need to enable publisher confirmations as they are + // automatically enabled for channels for (ushort i = 0; i < TotalMessageCount; i++) { @@ -358,10 +365,8 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag, protected static async Task SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey) { - using (IChannel ch = await conn.CreateChannelAsync()) + using (IChannel ch = await conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true)) { - await ch.ConfirmSelectAsync(); - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer = new AckingBasicConsumer(ch, 1, tcs); diff --git a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs index c3da0f291..62e8f3067 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs @@ -154,7 +154,11 @@ public async Task TestBasicAckAfterBasicGetAndChannelRecovery() [Fact] public async Task TestBasicAckEventHandlerRecovery() { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: false); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); ((AutorecoveringChannel)_channel).BasicAcksAsync += (m, args) => { diff --git a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs index 0cdfe8e17..04045fc7a 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs @@ -31,6 +31,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -45,6 +46,11 @@ public TestExchangeRecovery(ITestOutputHelper output) : base(output) [Fact] public async Task TestExchangeRecoveryTest() { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + string x = "dotnet-client.test.recovery.x1"; await DeclareNonDurableExchangeAsync(_channel, x); await CloseAndWaitForRecoveryAsync(); @@ -55,7 +61,10 @@ public async Task TestExchangeRecoveryTest() [Fact] public async Task TestExchangeToExchangeBindingRecovery() { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index f6b17f3ba..50b13e108 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -213,7 +213,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }); return Task.CompletedTask; }; - await using (IChannel publishChannel = await publishConn.CreateChannelAsync()) + await using (IChannel publishChannel = await publishConn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true)) { AddCallbackExceptionHandlers(publishConn, publishChannel); publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel, @@ -226,7 +226,6 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }); return Task.CompletedTask; }; - await publishChannel.ConfirmSelectAsync(); for (int i = 0; i < publish_total; i++) { @@ -436,6 +435,11 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, [Fact] public async Task TestBasicAckAsync() { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + await ValidateConsumerDispatchConcurrency(); string queueName = GenerateQueueName(); @@ -463,8 +467,6 @@ public async Task TestBasicAckAsync() return Task.CompletedTask; }; - await _channel.ConfirmSelectAsync(); - var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) => { @@ -649,8 +651,7 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() var consumer1 = new AsyncEventingBasicConsumer(_channel); consumer1.ReceivedAsync += async (sender, args) => { - await using IChannel innerChannel = await _conn.CreateChannelAsync(); - await innerChannel.ConfirmSelectAsync(); + await using IChannel innerChannel = await _conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true); await innerChannel.BasicPublishAsync(exchangeName, queue2Name, mandatory: true, body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650))); @@ -667,7 +668,8 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name, }; await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2); - await _channel.ConfirmSelectAsync(); + // Note: no need to enable publisher confirmations as they are + // automatically enabled for channels await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024)); await _channel.WaitForConfirmsOrDieAsync(); diff --git a/projects/Test/Integration/TestBasicPublishAsync.cs b/projects/Test/Integration/TestBasicPublishAsync.cs index ad4650e2a..ce0a56055 100644 --- a/projects/Test/Integration/TestBasicPublishAsync.cs +++ b/projects/Test/Integration/TestBasicPublishAsync.cs @@ -31,6 +31,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -45,11 +46,15 @@ public TestBasicPublishAsync(ITestOutputHelper output) : base(output) [Fact] public async Task TestQueuePurgeAsync() { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + const int messageCount = 1024; var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - await _channel.ConfirmSelectAsync(); QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true); diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs index 26f83eb34..07467401d 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs @@ -37,6 +37,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -54,6 +55,11 @@ public TestConcurrentAccessWithSharedChannel(ITestOutputHelper output) [Fact] public async Task ConcurrentPublishSingleChannel() { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: false); + int publishAckCount = 0; _channel.BasicAcksAsync += (object sender, BasicAckEventArgs e) => @@ -68,7 +74,6 @@ public async Task ConcurrentPublishSingleChannel() return Task.CompletedTask; }; - await _channel.ConfirmSelectAsync(trackConfirmations: false); await TestConcurrentOperationsAsync(async () => { diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index bea692094..05f588d73 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -108,7 +108,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in try { - await using IChannel ch = await _conn.CreateChannelAsync(); + await using IChannel ch = await _conn.CreateChannelAsync(publisherConfirmations: true); ch.ChannelShutdownAsync += (o, ea) => { HandleChannelShutdown(ch, ea, (args) => @@ -121,7 +121,6 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in return Task.CompletedTask; }; - await ch.ConfirmSelectAsync(trackConfirmations: false); ch.BasicAcksAsync += (object sender, BasicAckEventArgs e) => { diff --git a/projects/Test/Integration/TestConfirmSelect.cs b/projects/Test/Integration/TestConfirmSelect.cs index d2b9a38a5..2f51b6e4c 100644 --- a/projects/Test/Integration/TestConfirmSelect.cs +++ b/projects/Test/Integration/TestConfirmSelect.cs @@ -32,6 +32,7 @@ using System; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -46,20 +47,23 @@ public TestConfirmSelect(ITestOutputHelper output) : base(output) [Fact] public async Task TestConfirmSelectIdempotency() { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + ValueTask PublishAsync() { return _channel.BasicPublishAsync(exchange: "", routingKey: Guid.NewGuid().ToString(), _encoding.GetBytes("message")); } - await _channel.ConfirmSelectAsync(); Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync()); - await _channel.ConfirmSelectAsync(); await PublishAsync(); Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync()); await PublishAsync(); @@ -73,11 +77,15 @@ ValueTask PublishAsync() [InlineData(256)] public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + byte[] body = GetRandomBody(16); await _channel.ExchangeDeclareAsync("sample", "fanout", autoDelete: true); // _channel.BasicAcks += (s, e) => _output.WriteLine("Acked {0}", e.DeliveryTag); - await _channel.ConfirmSelectAsync(); var properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); diff --git a/projects/Test/Integration/TestConfirmSelectAsync.cs b/projects/Test/Integration/TestConfirmSelectAsync.cs index 0681ef05a..c75bed955 100644 --- a/projects/Test/Integration/TestConfirmSelectAsync.cs +++ b/projects/Test/Integration/TestConfirmSelectAsync.cs @@ -32,6 +32,7 @@ using System; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -48,14 +49,17 @@ public TestConfirmSelectAsync(ITestOutputHelper output) : base(output) [Fact] public async Task TestConfirmSelectIdempotency() { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync()); await Publish(); Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync()); await Publish(); Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync()); - await _channel.ConfirmSelectAsync(); await Publish(); Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync()); await Publish(); diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs index 8addefa19..fc5f2cbdc 100644 --- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs @@ -289,9 +289,8 @@ public async Task TestTopologyRecoveryConsumerFilter() return Task.CompletedTask; }; - await using (IChannel ch = await conn.CreateChannelAsync()) + await using (IChannel ch = await conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true)) { - await ch.ConfirmSelectAsync(); await ch.ExchangeDeclareAsync(exchange, "direct"); await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false); diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index ffe7d7847..ecb2ad964 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -37,6 +37,7 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -155,7 +156,7 @@ public async Task TestTopologyRecoveryExchangeFilter() tcs.SetResult(true); return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(); + IChannel ch = await conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true); try { await ch.ExchangeDeclareAsync(exchangeToRecover, "topic", false, true); @@ -270,10 +271,9 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() return Task.CompletedTask; }; - IChannel ch = await conn.CreateChannelAsync(); + IChannel ch = await conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true); try { - await ch.ConfirmSelectAsync(); await ch.ExchangeDeclareAsync(exchange, "direct"); await ch.QueueDeclareAsync(queue1, false, false, false); @@ -399,6 +399,11 @@ await _channel.QueueDeclareAsync(queueToRecoverWithException, false, false, fals [Fact] public async Task TestTopologyRecoveryExchangeExceptionHandler() { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + string exchangeToRecoverWithException = GenerateExchangeName() + "-recovery.exception.exchange"; string exchangeToRecoverSuccessfully = GenerateExchangeName() + "-successfully.recovered.exchange"; @@ -554,7 +559,8 @@ public async Task TestTopologyRecoveryConsumerExceptionHandler() IChannel ch = await conn.CreateChannelAsync(); try { - await ch.ConfirmSelectAsync(); + // Note: no need to enable publisher confirmations as they are + // automatically enabled for channels await _channel.QueueDeclareAsync(queueWithExceptionConsumer, false, false, false); await _channel.QueuePurgeAsync(queueWithExceptionConsumer); diff --git a/projects/Test/Integration/TestExtensions.cs b/projects/Test/Integration/TestExtensions.cs index fd55296e6..8cea70f24 100644 --- a/projects/Test/Integration/TestExtensions.cs +++ b/projects/Test/Integration/TestExtensions.cs @@ -32,6 +32,7 @@ using System; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -46,7 +47,11 @@ public TestExtensions(ITestOutputHelper output) : base(output) [Fact] public async Task TestConfirmBarrier() { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + for (int i = 0; i < 10; i++) { await _channel.BasicPublishAsync(string.Empty, string.Empty, Array.Empty()); @@ -63,7 +68,10 @@ public async Task TestConfirmBeforeWait() [Fact] public async Task TestExchangeBinding() { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); await _channel.ExchangeDeclareAsync("src", ExchangeType.Direct, false, false); await _channel.ExchangeDeclareAsync("dest", ExchangeType.Direct, false, false); diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 836aead2f..224fb4401 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -181,9 +181,8 @@ public async Task TestMultithreadFloodPublishing() return Task.CompletedTask; }; - await using (IChannel publishChannel = await publishConnection.CreateChannelAsync()) + await using (IChannel publishChannel = await publishConnection.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true)) { - await publishChannel.ConfirmSelectAsync(); publishChannel.ChannelShutdownAsync += (o, ea) => { diff --git a/projects/Test/Integration/TestMessageCount.cs b/projects/Test/Integration/TestMessageCount.cs index beca50f6d..bf5109f40 100644 --- a/projects/Test/Integration/TestMessageCount.cs +++ b/projects/Test/Integration/TestMessageCount.cs @@ -31,6 +31,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -45,7 +46,11 @@ public TestMessageCount(ITestOutputHelper output) : base(output) [Fact] public async Task TestMessageCountMethod() { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + string q = GenerateQueueName(); await _channel.QueueDeclareAsync(queue: q, passive: false, durable: false, exclusive: true, autoDelete: false, arguments: null); Assert.Equal(0u, await _channel.MessageCountAsync(q)); diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 80df6ea72..1c1c725b1 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -111,8 +111,7 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout public async Task TestWaitForConfirmsWithEventsAsync() { string queueName = GenerateQueueName(); - await using IChannel ch = await _conn.CreateChannelAsync(); - await ch.ConfirmSelectAsync(); + await using IChannel ch = await _conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true); await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: true, autoDelete: false, arguments: null); @@ -151,10 +150,9 @@ await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, private async Task TestWaitForConfirmsAsync(int numberOfMessagesToPublish, Func fn) { string queueName = GenerateQueueName(); - await using IChannel ch = await _conn.CreateChannelAsync(); + await using IChannel ch = await _conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true); var props = new BasicProperties { Persistent = true }; - await ch.ConfirmSelectAsync(); await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, exclusive: true, autoDelete: false, arguments: null); diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 725d7f7a5..a04ac0b98 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -125,19 +125,18 @@ public async Task TestCloseConnection() async Task PublishLoop() { - await using IChannel ch = await conn.CreateChannelAsync(); - await ch.ConfirmSelectAsync(); + await using IChannel ch = await conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true); QueueDeclareOk q = await ch.QueueDeclareAsync(); while (conn.IsOpen) { await ch.BasicPublishAsync("", q.QueueName, GetRandomBody()); messagePublishedTcs.TrySetResult(true); /* - * Note: - * In this test, it is possible that the connection - * will be closed before the ack is returned, - * and this await will throw an exception - */ + * Note: + * In this test, it is possible that the connection + * will be closed before the ack is returned, + * and this await will throw an exception + */ try { await ch.WaitForConfirmsAsync(); @@ -206,8 +205,7 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() Task pubTask = Task.Run(async () => { await using IConnection conn = await cf.CreateConnectionAsync(); - await using IChannel ch = await conn.CreateChannelAsync(); - await ch.ConfirmSelectAsync(); + await using IChannel ch = await conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true); QueueDeclareOk q = await ch.QueueDeclareAsync(); while (conn.IsOpen) { diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index c4abf5f94..fecd0d348 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -230,8 +230,7 @@ public async Task SecondConnectionCrashes_GH1429() private async Task DeclarePublishChannelAsync() { Assert.NotNull(_connection); - IChannel publishChannel = await _connection.CreateChannelAsync(); - await publishChannel.ConfirmSelectAsync(); + IChannel publishChannel = await _connection.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true); await publishChannel.ExchangeDeclareAsync("test_direct", ExchangeType.Direct, true, false); return publishChannel; } diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index 107168f9f..4a6715e4a 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -38,6 +38,7 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; @@ -80,7 +81,10 @@ void AssertIntTagGreaterThanZero(Activity activity, string name) [InlineData(false)] public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var _activities = new List(); @@ -117,7 +121,10 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera [InlineData(false)] public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var _activities = new List(); @@ -156,7 +163,10 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool use [InlineData(false)] public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var _activities = new List(); @@ -194,7 +204,10 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(boo [InlineData(false)] public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); @@ -232,7 +245,10 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs [InlineData(false)] public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); @@ -272,7 +288,10 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo [InlineData(false)] public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); @@ -311,7 +330,10 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn [InlineData(false)] public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); using ActivityListener activityListener = StartActivityListener(activities); @@ -344,7 +366,10 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera [InlineData(false)] public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); using ActivityListener activityListener = StartActivityListener(activities); @@ -379,7 +404,10 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use [InlineData(false)] public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) { - await _channel.ConfirmSelectAsync(); + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); using ActivityListener activityListener = StartActivityListener(activities); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs index abf8e238b..7480095e3 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs @@ -35,6 +35,7 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Framing; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; using QueueDeclareOk = RabbitMQ.Client.QueueDeclareOk; @@ -180,6 +181,11 @@ public Task TestClientNamedQueueRecoveryOnServerRestart() [Fact] public async Task TestClientNamedTransientAutoDeleteQueueAndBindingRecovery() { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + string queueName = GenerateQueueName(); string exchangeName = GenerateExchangeName(); try @@ -194,7 +200,6 @@ public async Task TestClientNamedTransientAutoDeleteQueueAndBindingRecovery() await RestartServerAndWaitForRecoveryAsync(); Assert.True(_channel.IsOpen); - await _channel.ConfirmSelectAsync(); QueueDeclareOk ok0 = await _channel.QueueDeclarePassiveAsync(queue: queueName); Assert.Equal(queueName, ok0.QueueName); await _channel.QueuePurgeAsync(queueName); @@ -217,6 +222,11 @@ public async Task TestClientNamedTransientAutoDeleteQueueAndBindingRecovery() [Fact] public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + string x = "tmp-fanout"; await _channel.ExchangeDeleteAsync(x); await _channel.ExchangeDeclareAsync(exchange: x, type: "fanout"); @@ -240,7 +250,6 @@ public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() Assert.True(_channel.IsOpen); Assert.NotEqual(nameBefore, nameAfter); - await _channel.ConfirmSelectAsync(); await _channel.ExchangeDeclareAsync(exchange: x, type: "fanout"); await _channel.BasicPublishAsync(exchange: x, routingKey: "", body: _encoding.GetBytes("msg")); await WaitForConfirmsWithCancellationAsync(_channel); diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index 7a99a00e6..84c795afd 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -40,6 +40,7 @@ using OpenTelemetry.Trace; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; using Xunit.Sdk; @@ -87,6 +88,11 @@ void AssertIntTagGreaterThanZero(Activity activity, string name) [InlineData(false)] public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName) { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + var exportedItems = new List(); using var tracer = Sdk.CreateTracerProviderBuilder() .AddRabbitMQInstrumentation() @@ -95,7 +101,6 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); @@ -142,6 +147,11 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera [InlineData(false)] public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + var exportedItems = new List(); using var tracer = Sdk.CreateTracerProviderBuilder() .AddRabbitMQInstrumentation() @@ -150,7 +160,6 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); @@ -198,6 +207,11 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs [InlineData(false)] public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + var exportedItems = new List(); using var tracer = Sdk.CreateTracerProviderBuilder() .AddRabbitMQInstrumentation() @@ -206,7 +220,6 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); @@ -255,6 +268,11 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn [InlineData(false)] public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + var exportedItems = new List(); using var tracer = Sdk.CreateTracerProviderBuilder() .AddRabbitMQInstrumentation() @@ -263,7 +281,6 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); @@ -313,6 +330,11 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo [InlineData(false)] public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) { + // TODO + // Hack for rabbitmq/rabbitmq-dotnet-client#1682 + AutorecoveringChannel ach = (AutorecoveringChannel)_channel; + await ach.ConfirmSelectAsync(trackConfirmations: true); + var exportedItems = new List(); using var tracer = Sdk.CreateTracerProviderBuilder() .AddRabbitMQInstrumentation() @@ -321,7 +343,6 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera string baggageGuid = Guid.NewGuid().ToString(); Baggage.SetBaggage("TestItem", baggageGuid); Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; await Task.Delay(500); string queue = $"queue-{Guid.NewGuid()}";