Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lt 5705 rabbit mq broker add quorum queues support for nl queues #65

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 4 additions & 36 deletions src/Lykke.RabbitMqBroker/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
// Copyright (c) Lykke Corp.
// Licensed under the MIT License. See the LICENSE file in the project root for more information.

using System;
using Lykke.RabbitMqBroker.Subscriber;

namespace Lykke.RabbitMqBroker
{
internal static class RabbitMqSettingsExtension
{
private const string PoisonQueueSuffix = "poison";

internal static string GetPublisherDisplayName(this RabbitMqSubscriptionSettings settings)
{
return $"Publisher {settings.ExchangeName}";
Expand All @@ -29,41 +27,11 @@ internal static string GetQueueOrExchangeDisplayName(this RabbitMqSubscriptionSe
/// </summary>
/// <param name="settings"></param>
/// <returns></returns>
internal static string GetQueueName(this RabbitMqSubscriptionSettings settings)
internal static QueueName GetQueueName(this RabbitMqSubscriptionSettings settings)
{
return string.IsNullOrEmpty(settings.QueueName)
? settings.ExchangeName + "." + Guid.NewGuid()
: settings.QueueName;
}

/// <summary>
/// Gets the poison queue name. If the queue name is not set, it will be generated.
/// </summary>
/// <param name="settings"></param>
/// <returns></returns>
internal static string GetPoisonQueueName(this RabbitMqSubscriptionSettings settings)
{
return settings.GetQueueName().GetPoisonQueueName();
}

/// <summary>
/// Gets the poison queue name for the regular queue.
/// </summary>
/// <param name="regularQueueName"></param>
/// <returns></returns>
internal static string GetPoisonQueueName(this string regularQueueName)
{
if (string.IsNullOrEmpty(regularQueueName))
{
throw new ArgumentNullException(nameof(regularQueueName));
}

if (regularQueueName.EndsWith(PoisonQueueSuffix))
{
return regularQueueName;
}

return $"{regularQueueName}-{PoisonQueueSuffix}";
? QueueName.FromExchangeName(settings.ExchangeName)
: QueueName.Create(settings.QueueName);
}
}
}
83 changes: 67 additions & 16 deletions src/Lykke.RabbitMqBroker/Subscriber/ChannelFactoryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,56 @@

namespace Lykke.RabbitMqBroker.Subscriber;

using Failure = QueueConfigurationPreconditionFailure;

internal static class ChannelFactoryExtensions
{
private static IQueueConfigurationResult Execute<TResponse>(this Func<IModel> channelFactory, Func<IModel, TResponse> action)
private static IConfigurationResult Execute(
this Func<IModel> channelFactory,
Action<IModel> action)
{
using var channel = channelFactory();
try
{
action(channel);
return ConfigurationResult.Success();
}
catch (OperationInterruptedException ex)
{
if (ex.ShutdownReason is not null)
{
var errorCode = new ConfigurationErrorCode(ex.ShutdownReason.ReplyCode);
return ConfigurationResult.Failure(
new ConfigurationError(errorCode, ex.ShutdownReason.ReplyText));
}

throw;
}
}

private static IConfigurationResult<T> Execute<T>(
this Func<IModel> channelFactory,
Func<IModel, T> action)
{
using var channel = channelFactory();
try
{
var response = action(channel);
return new QueueConfigurationSuccess<TResponse>(response);
return ConfigurationResult<T>.Success(response);
}
catch (OperationInterruptedException ex)
{
if (ex.ShutdownReason is { ReplyCode: Constants.PreconditionFailed })
return new Failure(ex.ShutdownReason?.ReplyText ?? ex.Message);
if (ex.ShutdownReason is not null)
{
var errorCode = new ConfigurationErrorCode(ex.ShutdownReason.ReplyCode);
return ConfigurationResult<T>.Failure(
new ConfigurationError(errorCode, ex.ShutdownReason.ReplyText));
}

throw;
}
}

/// <summary>
/// Safe deletion of classic queue meaning there are no consumers and
/// Safe deletion of the queue meaning there are no consumers and
/// no messages in the queue.
/// </summary>
/// <param name="channelFactory"></param>
Expand All @@ -39,35 +67,58 @@ private static IQueueConfigurationResult Execute<TResponse>(this Func<IModel> ch
/// This method is intended to be used in case of precondition failure.
/// It works only for classic queues since for quorum queues `ifUnused`
/// and `ifEmpty` parameters are not supported so far.
/// It can't be used conditionally since there is no information about
/// queue type about to be deleted. Thereore it will always fail
/// for quorum queues.
/// </remarks>
public static IQueueConfigurationResult SafeDeleteClassicQueue(this Func<IModel> channelFactory, string queueName) =>
public static IConfigurationResult<uint> SafeDeleteQueue(this Func<IModel> channelFactory, string queueName) =>
channelFactory.Execute(ch => ch.QueueDelete(queueName, ifUnused: true, ifEmpty: true));

public static IQueueConfigurationResult DeclareQueue(
public static IConfigurationResult<QueueDeclareOk> DeclareQueue(
this Func<IModel> channelFactory,
QueueConfigurationOptions options,
Dictionary<string, object> args)
{
return channelFactory.Execute(ch => ch.QueueDeclare(
queue: options.QueueName,
queue: options.QueueName.ToString(),
durable: options.Durable,
exclusive: false,
autoDelete: options.AutoDelete,
arguments: args));
}

public static IQueueConfigurationResult BindQueue(
public static IConfigurationResult DeclareExchange(
this Func<IModel> channelFactory,
ExchangeConfigurationOptions options)
{
return channelFactory.Execute(ch =>
ch.ExchangeDeclare(
exchange: options.ExchangeName.ToString(),
type: options.ExchangeType,
durable: options.Durable,
autoDelete: options.AutoDelete)
);
}

/// <summary>
/// Binds a queue to an exchange
/// </summary>
/// <param name="channelFactory"></param>
/// <param name="options"></param>
/// <returns>
/// The name of the bound queue
/// </returns>
public static IConfigurationResult<QueueName> BindQueue(
this Func<IModel> channelFactory,
string queueName,
QueueConfigurationOptions options)
{
return channelFactory.Execute(ch =>
{
ch.QueueBind(
queue: queueName,
exchange: options.ExchangeName,
routingKey: options.RoutingKey);
return queueName;
queue: options.QueueName.ToString(),
exchange: options.ExistingExchangeName.ToString(),
routingKey: options.RoutingKey.ToString());
return options.QueueName;
});
}
}
55 changes: 55 additions & 0 deletions src/Lykke.RabbitMqBroker/Subscriber/ConfigurationResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Diagnostics;

using RabbitMQ.Client;

namespace Lykke.RabbitMqBroker.Subscriber;

[DebuggerDisplay("{Code}")]
internal record ConfigurationErrorCode(ushort Code)
{
public ushort Code { get; } = Code < 0 ? throw new ArgumentOutOfRangeException(nameof(Code)) : Code;
public static ConfigurationErrorCode PreconditionsFailed => new(Constants.PreconditionFailed);
public static ConfigurationErrorCode None => new(0);
}

[DebuggerDisplay("{Code}: {Message}")]
internal record ConfigurationError(ConfigurationErrorCode Code, string Message)
{
public static ConfigurationError None => new(ConfigurationErrorCode.None, string.Empty);
}

internal interface IConfigurationResult
{
bool IsSuccess => Error == ConfigurationError.None;
bool IsFailure => !IsSuccess;
ConfigurationError Error { get; }
}

internal interface IConfigurationResult<T> : IConfigurationResult
{
T Response { get; }
public void Deconstruct(out T response, out ConfigurationError error)
{
response = Response;
error = Error;
}
}

internal abstract record ConfigurationResultBase(ConfigurationError Error) : IConfigurationResult;

internal abstract record ConfigurationResultBase<T>(ConfigurationError Error, T Response) : ConfigurationResultBase(Error), IConfigurationResult<T>;

internal record ConfigurationResult(ConfigurationError Error) : ConfigurationResultBase(Error)
{
public static IConfigurationResult Success() => new ConfigurationResult(ConfigurationError.None);
public static IConfigurationResult Failure(ConfigurationError error) => new ConfigurationResult(error);
public static implicit operator ConfigurationResult(ConfigurationError error) => new(error);
}

internal record ConfigurationResult<T>(ConfigurationError Error, T Response) : ConfigurationResultBase<T>(Error, Response)
{
public static IConfigurationResult<T> Success(T response) => new ConfigurationResult<T>(ConfigurationError.None, response);
public static IConfigurationResult<T> Failure(ConfigurationError error) => new ConfigurationResult<T>(error, default);
public static implicit operator ConfigurationResult<T>(ConfigurationError error) => new(error, default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;

namespace Lykke.RabbitMqBroker.Subscriber;

internal static class ConfigurationResultChainingExtensions
{
public static IConfigurationResult Match(
this IConfigurationResult result,
Func<IConfigurationResult> onSuccess)
{
return result.IsSuccess ? onSuccess() : result;
}

public static IConfigurationResult<TResponse> Match<TResponse>(
this IConfigurationResult<TResponse> result,
Func<TResponse, IConfigurationResult<TResponse>> onSuccess)
{
return result.IsSuccess ? onSuccess(result.Response) : result;
}

public static IConfigurationResult<TResponse> Match<TResponse>(
this IConfigurationResult<TResponse> result,
Func<ConfigurationError, IConfigurationResult<TResponse>> onFailure)
{
return result.IsSuccess ? result : onFailure(result.Error);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;

namespace Lykke.RabbitMqBroker.Subscriber;

internal static class ConfigurationResultExtensions
{
public static T Match<T>(
this IConfigurationResult result,
Func<T> onSuccess,
Func<ConfigurationError, T> onFailure)
{
return result.IsSuccess ? onSuccess() : onFailure(result.Error);
}

public static T Match<T, TResponse>(
this IConfigurationResult<TResponse> result,
Func<TResponse, T> onSuccess,
Func<ConfigurationError, T> onFailure)
{
return result.IsSuccess ? onSuccess(result.Response) : onFailure(result.Error);
}
}
16 changes: 16 additions & 0 deletions src/Lykke.RabbitMqBroker/Subscriber/DeadLetterExchangeName.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Diagnostics;

namespace Lykke.RabbitMqBroker.Subscriber;

[DebuggerDisplay("{Value}")]
public record DeadLetterExchangeName(string Value) : ExchangeName(Value)
{
private const string DeadLetterExchangeSuffix = "dlx";
public static new DeadLetterExchangeName Create(string value) =>
value.EndsWith(DeadLetterExchangeSuffix, StringComparison.InvariantCultureIgnoreCase)
? new(value)
: new($"{value}.{DeadLetterExchangeSuffix}");
public static DeadLetterExchangeName FromExchangeName(ExchangeName exchangeName) => Create(exchangeName.Value);
public override string ToString() => base.ToString();
}
10 changes: 10 additions & 0 deletions src/Lykke.RabbitMqBroker/Subscriber/ExchangeName.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Diagnostics;

namespace Lykke.RabbitMqBroker.Subscriber;

[DebuggerDisplay("{Value}")]
public record ExchangeName(string Value) : ResourceName(Validate(Value))
{
public static ExchangeName Create(string value) => new(value);
public override string ToString() => base.ToString();
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Lykke.RabbitMqBroker.Subscriber.MessageReadStrategies;

/// <param name="ExchangeName"> The name of the exchange to be declared. </param>
/// <param name="ExchangeType"> The type of the exchange to be declared. </param>
/// <param name="Durable"> Whether the exchange should be durable. </param>
/// <param name="AutoDelete"> Whether the exchange should be auto-deleted. </param>
internal sealed record ExchangeConfigurationOptions(ExchangeName ExchangeName, string ExchangeType, bool Durable, bool AutoDelete);
Loading
Loading