Skip to content

Commit

Permalink
test(LT-5705): covered message read strategy configuration with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
atarutin committed Sep 24, 2024
1 parent c477a04 commit bae0d2c
Show file tree
Hide file tree
Showing 11 changed files with 1,541 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2024 Lykke Corp.
// See the LICENSE file in the project root for more information.

using System;

using RabbitMQ.Client;

namespace Lykke.RabbitMqBroker.Subscriber.MessageReadStrategies;

internal static class MessageReadStrategyConfigurationExtensions
{
public static IConfigurationResult<QueueName> StrategyTryConfigure(this Func<IModel> channelFactory, QueueConfigurationOptions options)
{
var queueConfigurationResult = QueueConfigurator.Configure(channelFactory, options);

if (queueConfigurationResult.IsSuccess && options.DeadLetterExchangeName is not null)
{
var dlxConfigurationResult = ExchangeConfigurator.ConfigureDlx(channelFactory, options).Match(
onSuccess: () => QueueConfigurator.ConfigurePoison(channelFactory, options));

return dlxConfigurationResult.IsSuccess
? queueConfigurationResult
: ConfigurationResult<QueueName>.Failure(dlxConfigurationResult.Error);
}

return queueConfigurationResult;
}

public static IConfigurationResult<QueueName> StrategyRetryWithQueueRecreation(this Func<IModel> channelFactory, QueueConfigurationOptions options)
{
return channelFactory.SafeDeleteQueue(options.QueueName.ToString()).Match(
onSuccess: _ => StrategyTryConfigure(channelFactory, options),
onFailure: _ => throw new InvalidOperationException($"Failed to delete queue [{options.QueueName}]."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,13 @@ public QueueName Configure(RabbitMqSubscriptionSettings settings, Func<IModel> c
{
var options = CreateQueueConfigurationOptions(settings);

var (queueName, _) = TryConfigure(channelFactory, options).Match(
onFailure: _ => RetryWithQueueRecreation(channelFactory, options).Match(
var (queueName, _) = channelFactory.StrategyTryConfigure(options).Match(
onFailure: _ => channelFactory.StrategyRetryWithQueueRecreation(options).Match(
onFailure: _ => throw new InvalidOperationException($"Failed to configure queue [{options.QueueName}] after precondition failure")));

return queueName;
}

private static IConfigurationResult<QueueName> TryConfigure(Func<IModel> channelFactory, QueueConfigurationOptions options)
{
var queueConfigurationResult = QueueConfigurator.Configure(channelFactory, options);

if (queueConfigurationResult.IsSuccess && options.DeadLetterExchangeName is not null)
{
var dlxConfigurationResult = ExchangeConfigurator.ConfigureDlx(channelFactory, options).Match(
onSuccess: () => QueueConfigurator.ConfigurePoison(channelFactory, options));

return dlxConfigurationResult.IsSuccess
? queueConfigurationResult
: ConfigurationResult<QueueName>.Failure(dlxConfigurationResult.Error);
}

return queueConfigurationResult;
}

private static IConfigurationResult<QueueName> RetryWithQueueRecreation(Func<IModel> channelFactory, QueueConfigurationOptions options)
{
return channelFactory.SafeDeleteClassicQueue(options.QueueName.ToString()).Match(
onSuccess: _ => TryConfigure(channelFactory, options),
onFailure: _ => throw new InvalidOperationException($"Failed to delete queue [{options.QueueName}]."));
}

private QueueConfigurationOptions CreateQueueConfigurationOptions(RabbitMqSubscriptionSettings settings)
{
var effectiveRoutingKey = _routingKey == RoutingKey.Empty
Expand All @@ -64,8 +40,20 @@ private QueueConfigurationOptions CreateQueueConfigurationOptions(RabbitMqSubscr

return QueueType switch
{
QueueType.Classic => QueueConfigurationOptions.ForClassicQueue(settings.GetQueueName(), ExchangeName.Create(settings.ExchangeName), string.IsNullOrWhiteSpace(settings.DeadLetterExchangeName) ? null : DeadLetterExchangeName.Create(settings.DeadLetterExchangeName), StrategyDefaultDeadLetterExchangeType, Durable, AutoDelete, effectiveRoutingKey),
QueueType.Quorum => QueueConfigurationOptions.ForQuorumQueue(settings.GetQueueName(), ExchangeName.Create(settings.ExchangeName), string.IsNullOrWhiteSpace(settings.DeadLetterExchangeName) ? null : DeadLetterExchangeName.Create(settings.DeadLetterExchangeName), StrategyDefaultDeadLetterExchangeType, effectiveRoutingKey),
QueueType.Classic => QueueConfigurationOptions.ForClassicQueue(
settings.GetQueueName(),
ExchangeName.Create(settings.ExchangeName),
string.IsNullOrWhiteSpace(settings.DeadLetterExchangeName) ? null : DeadLetterExchangeName.Create(settings.DeadLetterExchangeName),
StrategyDefaultDeadLetterExchangeType,
Durable,
AutoDelete,
effectiveRoutingKey),
QueueType.Quorum => QueueConfigurationOptions.ForQuorumQueue(
settings.GetQueueName(),
ExchangeName.Create(settings.ExchangeName),
string.IsNullOrWhiteSpace(settings.DeadLetterExchangeName) ? null : DeadLetterExchangeName.Create(settings.DeadLetterExchangeName),
StrategyDefaultDeadLetterExchangeType,
effectiveRoutingKey),
_ => throw new InvalidOperationException($"Unsupported queue type [{QueueType}]")
};
}
Expand Down
14 changes: 10 additions & 4 deletions tests/Lykke.RabbitMqBroker.Tests/ExchangeConfiguratorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ public void Configure_DeclaresExchange()
AutoDelete: false
);

var result = ExchangeConfigurator.Configure(() => new ConfiguratorFakeChannel(), options);
var result = ExchangeConfigurator.Configure(() => new PrimitivesConfiguratorFakeChannel(), options);

Assert.That(result.IsSuccess);
Assert.That(ConfiguratorFakeChannel.DeclaredExchanges, Does.Contain(options.ExchangeName.ToString()));
Assert.That(PrimitivesConfiguratorFakeChannel.DeclaredExchanges, Does.Contain(options.ExchangeName.ToString()));
}

[Test]
Expand All @@ -35,9 +35,15 @@ public void ConfigureDlx_DeclaresDlxExchange()
RoutingKey: RoutingKey.Empty
);

var result = ExchangeConfigurator.ConfigureDlx(() => new ConfiguratorFakeChannel(), originalQueueOptions);
var result = ExchangeConfigurator.ConfigureDlx(() => new PrimitivesConfiguratorFakeChannel(), originalQueueOptions);

Assert.That(result.IsSuccess);
Assert.That(ConfiguratorFakeChannel.DeclaredExchanges, Does.Contain(originalQueueOptions.DeadLetterExchangeName.ToString()));
Assert.That(PrimitivesConfiguratorFakeChannel.DeclaredExchanges, Does.Contain(originalQueueOptions.DeadLetterExchangeName.ToString()));
}

[TearDown]
public void TearDown()
{
PrimitivesConfiguratorFakeChannel.ResetCounters();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
using System;
using System.Collections.Generic;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace Lykke.RabbitMqBroker.Tests.Fakes;

public class ExchangeDeclarationExpectedFailureFakeChannel : IModel
{
public int ChannelNumber => throw new NotImplementedException();

public ShutdownEventArgs CloseReason => throw new NotImplementedException();

public IBasicConsumer DefaultConsumer { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }

public bool IsClosed => throw new NotImplementedException();

public bool IsOpen => throw new NotImplementedException();

public ulong NextPublishSeqNo => throw new NotImplementedException();

public TimeSpan ContinuationTimeout { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }

public event EventHandler<BasicAckEventArgs> BasicAcks;
public event EventHandler<BasicNackEventArgs> BasicNacks;
public event EventHandler<EventArgs> BasicRecoverOk;
public event EventHandler<BasicReturnEventArgs> BasicReturn;
public event EventHandler<CallbackExceptionEventArgs> CallbackException;
public event EventHandler<FlowControlEventArgs> FlowControl;
public event EventHandler<ShutdownEventArgs> ModelShutdown;

public void Abort()
{
throw new NotImplementedException();
}

public void Abort(ushort replyCode, string replyText)
{
throw new NotImplementedException();
}

public void BasicAck(ulong deliveryTag, bool multiple)
{
throw new NotImplementedException();
}

public void BasicCancel(string consumerTag)
{
throw new NotImplementedException();
}

public void BasicCancelNoWait(string consumerTag)
{
throw new NotImplementedException();
}

public string BasicConsume(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer)
{
throw new NotImplementedException();
}

public BasicGetResult BasicGet(string queue, bool autoAck)
{
throw new NotImplementedException();
}

public void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
{
throw new NotImplementedException();
}

public void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
throw new NotImplementedException();
}

public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
{
throw new NotImplementedException();
}

public void BasicRecover(bool requeue)
{
throw new NotImplementedException();
}

public void BasicRecoverAsync(bool requeue)
{
throw new NotImplementedException();
}

public void BasicReject(ulong deliveryTag, bool requeue)
{
throw new NotImplementedException();
}

public void Close()
{
throw new NotImplementedException();
}

public void Close(ushort replyCode, string replyText)
{
throw new NotImplementedException();
}

public void ConfirmSelect()
{
throw new NotImplementedException();
}

public uint ConsumerCount(string queue)
{
throw new NotImplementedException();
}

public IBasicProperties CreateBasicProperties()
{
throw new NotImplementedException();
}

public IBasicPublishBatch CreateBasicPublishBatch()
{
throw new NotImplementedException();
}

public void Dispose()
{
}

public void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments)
{
throw new NotImplementedException();
}

public void ExchangeBindNoWait(string destination, string source, string routingKey, IDictionary<string, object> arguments)
{
throw new NotImplementedException();
}

public void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments)
{
throw new OperationInterruptedException(new ShutdownEventArgs(ShutdownInitiator.Application, Constants.PreconditionFailed, "Precondition failed", nameof(ExchangeDeclare)));
}

public void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments)
{
throw new OperationInterruptedException(new ShutdownEventArgs(ShutdownInitiator.Application, Constants.PreconditionFailed, "Precondition failed", nameof(ExchangeDeclareNoWait)));
}

public void ExchangeDeclarePassive(string exchange)
{
throw new OperationInterruptedException(new ShutdownEventArgs(ShutdownInitiator.Application, Constants.PreconditionFailed, "Precondition failed", nameof(ExchangeDeclarePassive)));
}

public void ExchangeDelete(string exchange, bool ifUnused)
{
throw new NotImplementedException();
}

public void ExchangeDeleteNoWait(string exchange, bool ifUnused)
{
throw new NotImplementedException();
}

public void ExchangeUnbind(string destination, string source, string routingKey, IDictionary<string, object> arguments)
{
throw new NotImplementedException();
}

public void ExchangeUnbindNoWait(string destination, string source, string routingKey, IDictionary<string, object> arguments)
{
throw new NotImplementedException();
}

public uint MessageCount(string queue)
{
throw new NotImplementedException();
}

public void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
{
}

public void QueueBindNoWait(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
{
}

public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
return new QueueDeclareOk(queue, 0, 0);
}

public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
}

public QueueDeclareOk QueueDeclarePassive(string queue)
{
return new QueueDeclareOk(queue, 0, 0);
}

public uint QueueDelete(string queue, bool ifUnused, bool ifEmpty)
{
throw new NotImplementedException();
}

public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty)
{
throw new NotImplementedException();
}

public uint QueuePurge(string queue)
{
throw new NotImplementedException();
}

public void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
{
throw new NotImplementedException();
}

public void TxCommit()
{
throw new NotImplementedException();
}

public void TxRollback()
{
throw new NotImplementedException();
}

public void TxSelect()
{
throw new NotImplementedException();
}

public bool WaitForConfirms()
{
throw new NotImplementedException();
}

public bool WaitForConfirms(TimeSpan timeout)
{
throw new NotImplementedException();
}

public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
{
throw new NotImplementedException();
}

public void WaitForConfirmsOrDie()
{
throw new NotImplementedException();
}

public void WaitForConfirmsOrDie(TimeSpan timeout)
{
throw new NotImplementedException();
}
}
Loading

0 comments on commit bae0d2c

Please sign in to comment.