From ea650254b393e1b8ca966a009465f5b32b9723a1 Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Tue, 1 Oct 2024 00:25:58 +0200 Subject: [PATCH] Proposal for flexible Outbox PKs Signed-off-by: Tomasz Maruszak --- .../DbContextOutboxRepository.cs | 11 ++- .../Adapters/GuidOutboxMessage.cs | 16 +++++ .../Adapters/GuidOutboxMessageAdapter.cs | 29 ++++++++ .../Adapters/IOutboxMessageAdapter.cs | 12 ++++ .../MessageBusBuilderExtensions.cs | 2 + .../ISqlOutboxRepository.cs | 2 +- .../SqlOutboxRepository.cs | 71 ++++++++++--------- .../OutboxForwardingPublishInterceptor.cs | 17 +++-- .../Repositories/IOutboxRepository.cs | 7 +- .../Repositories/OutboxMessage.cs | 1 - .../Services/OutboxSendingTask.cs | 40 +++++------ .../BaseSqlOutboxRepositoryTest.cs | 8 ++- .../SqlOutboxRepositoryTests.cs | 47 ++++++------ .../Services/OutboxSendingTaskTests.cs | 29 ++++---- 14 files changed, 179 insertions(+), 113 deletions(-) create mode 100644 src/SlimMessageBus.Host.Outbox.Sql/Adapters/GuidOutboxMessage.cs create mode 100644 src/SlimMessageBus.Host.Outbox.Sql/Adapters/GuidOutboxMessageAdapter.cs create mode 100644 src/SlimMessageBus.Host.Outbox.Sql/Adapters/IOutboxMessageAdapter.cs diff --git a/src/SlimMessageBus.Host.Outbox.DbContext/DbContextOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.DbContext/DbContextOutboxRepository.cs index d9deb1b7..9107b405 100644 --- a/src/SlimMessageBus.Host.Outbox.DbContext/DbContextOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox.DbContext/DbContextOutboxRepository.cs @@ -16,8 +16,15 @@ public DbContextOutboxRepository( SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, TDbContext dbContext, - ISqlTransactionService transactionService) - : base(logger, settings, sqlOutboxTemplate, (SqlConnection)dbContext.Database.GetDbConnection(), transactionService) + ISqlTransactionService transactionService, + IOutboxMessageAdapter outboxMessageAdapter) + : base( + logger, + settings, + sqlOutboxTemplate, + (SqlConnection)dbContext.Database.GetDbConnection(), + transactionService, + outboxMessageAdapter) { DbContext = dbContext; } diff --git a/src/SlimMessageBus.Host.Outbox.Sql/Adapters/GuidOutboxMessage.cs b/src/SlimMessageBus.Host.Outbox.Sql/Adapters/GuidOutboxMessage.cs new file mode 100644 index 00000000..ec7da834 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.Sql/Adapters/GuidOutboxMessage.cs @@ -0,0 +1,16 @@ +namespace SlimMessageBus.Host.Outbox.Sql; + +public class GuidOutboxMessage : OutboxMessage, IEquatable +{ + public Guid Id { get; set; } + + public override string ToString() => Id.ToString(); + + public override bool Equals(object obj) + => Equals(obj as GuidOutboxMessage); + + public bool Equals(GuidOutboxMessage other) + => other is not null && Id.Equals(other.Id); + + public override int GetHashCode() => Id.GetHashCode(); +} diff --git a/src/SlimMessageBus.Host.Outbox.Sql/Adapters/GuidOutboxMessageAdapter.cs b/src/SlimMessageBus.Host.Outbox.Sql/Adapters/GuidOutboxMessageAdapter.cs new file mode 100644 index 00000000..4c383a0c --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.Sql/Adapters/GuidOutboxMessageAdapter.cs @@ -0,0 +1,29 @@ +namespace SlimMessageBus.Host.Outbox.Sql; + +public class GuidOutboxMessageAdapter(SqlOutboxTemplate sqlOutboxTemplate) : IOutboxMessageAdapter +{ + public OutboxMessage Create() + => new GuidOutboxMessage + { + Id = Guid.NewGuid() + }; + + public OutboxMessage Create(SqlDataReader reader, int idOrdinal) + => new GuidOutboxMessage + { + Id = reader.GetGuid(idOrdinal) + }; + + public SqlParameter CreateIdSqlParameter(string parameterName, OutboxMessage outboxMessage) + { + var guidOutboxMessage = (GuidOutboxMessage)outboxMessage; + return new SqlParameter(parameterName, SqlDbType.UniqueIdentifier) { Value = guidOutboxMessage.Id }; + } + + public SqlParameter CreateIdsSqlParameter(string parameterName, IEnumerable outboxMessages) + { + var guidIds = outboxMessages.Cast().Select(x => x.Id); + var idsString = string.Join(sqlOutboxTemplate.InIdsSeparator, guidIds); + return new SqlParameter(parameterName, SqlDbType.NVarChar) { Value = guidIds }; + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Outbox.Sql/Adapters/IOutboxMessageAdapter.cs b/src/SlimMessageBus.Host.Outbox.Sql/Adapters/IOutboxMessageAdapter.cs new file mode 100644 index 00000000..123bc3a0 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.Sql/Adapters/IOutboxMessageAdapter.cs @@ -0,0 +1,12 @@ +namespace SlimMessageBus.Host.Outbox.Sql; + +/// +/// Allows to customize the outbox message creation and handling (PK type, etc). +/// +public interface IOutboxMessageAdapter +{ + OutboxMessage Create(); + OutboxMessage Create(SqlDataReader reader, int idOrdinal); + SqlParameter CreateIdSqlParameter(string parameterName, OutboxMessage outboxMessage); + SqlParameter CreateIdsSqlParameter(string parameterName, IEnumerable outboxMessages); +} diff --git a/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs index 19676168..dbd59e59 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/Configuration/MessageBusBuilderExtensions.cs @@ -36,6 +36,8 @@ public static MessageBusBuilder AddOutboxUsingSql(this Messag services.TryAddScoped(); services.TryAddScoped(); + services.TryAddSingleton(); + services.Replace(ServiceDescriptor.Scoped(svp => svp.GetRequiredService())); services.Replace(ServiceDescriptor.Scoped(svp => svp.GetRequiredService())); diff --git a/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs index cb124ccb..efe6f0c9 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs @@ -2,4 +2,4 @@ public interface ISqlOutboxRepository : IOutboxRepository { -} \ No newline at end of file +} diff --git a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs index 6501bb99..83b8f5aa 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs @@ -1,29 +1,32 @@ namespace SlimMessageBus.Host.Outbox.Sql; - public class SqlOutboxRepository : CommonSqlRepository, ISqlOutboxRepository { private readonly SqlOutboxTemplate _sqlTemplate; + private readonly IOutboxMessageAdapter _outboxMessageAdapter; private readonly JsonSerializerOptions _jsonOptions; protected SqlOutboxSettings Settings { get; } - public SqlOutboxRepository(ILogger logger, SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, SqlConnection connection, ISqlTransactionService transactionService) + public SqlOutboxRepository(ILogger logger, SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, SqlConnection connection, ISqlTransactionService transactionService, IOutboxMessageAdapter outboxMessageAdapter) : base(logger, settings.SqlSettings, connection, transactionService) { _sqlTemplate = sqlOutboxTemplate; + _outboxMessageAdapter = outboxMessageAdapter; _jsonOptions = new(); _jsonOptions.Converters.Add(new ObjectToInferredTypesConverter()); Settings = settings; } + public OutboxMessage Create() => _outboxMessageAdapter.Create(); + public async virtual Task Save(OutboxMessage message, CancellationToken token) { await EnsureConnection(); await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutboxMessageInsert, cmd => { - cmd.Parameters.Add("@Id", SqlDbType.UniqueIdentifier).Value = message.Id; + cmd.Parameters.Add(_outboxMessageAdapter.CreateIdSqlParameter("@Id", message)); cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = message.Timestamp; cmd.Parameters.Add("@BusName", SqlDbType.NVarChar).Value = message.BusName; cmd.Parameters.Add("@MessageType", SqlDbType.NVarChar).Value = message.MessageType; @@ -52,9 +55,9 @@ public async Task> LockAndSelect(string insta return await ReadMessages(cmd, token).ConfigureAwait(false); } - public async Task AbortDelivery(IReadOnlyCollection ids, CancellationToken token) + public async Task AbortDelivery(IReadOnlyCollection messages, CancellationToken token) { - if (ids.Count == 0) + if (messages.Count == 0) { return; } @@ -65,19 +68,19 @@ public async Task AbortDelivery(IReadOnlyCollection ids, CancellationToken _sqlTemplate.SqlOutboxMessageAbortDelivery, cmd => { - cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids)); + cmd.Parameters.Add(_outboxMessageAdapter.CreateIdsSqlParameter("@Ids", messages)); }, token: token); - if (affected != ids.Count) + if (affected != messages.Count) { - throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected"); + throw new MessageBusException($"The number of affected rows was {affected}, but {messages.Count} was expected"); } } - public async Task UpdateToSent(IReadOnlyCollection ids, CancellationToken token) + public async Task UpdateToSent(IReadOnlyCollection messages, CancellationToken token) { - if (ids.Count == 0) + if (messages.Count == 0) { return; } @@ -88,21 +91,21 @@ public async Task UpdateToSent(IReadOnlyCollection ids, CancellationToken _sqlTemplate.SqlOutboxMessageUpdateSent, cmd => { - cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids)); + cmd.Parameters.Add(_outboxMessageAdapter.CreateIdsSqlParameter("@Ids", messages)); }, token: token); - if (affected != ids.Count) + if (affected != messages.Count) { - throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected"); + throw new MessageBusException($"The number of affected rows was {affected}, but {messages.Count} was expected"); } } private string ToIdsString(IReadOnlyCollection ids) => string.Join(_sqlTemplate.InIdsSeparator, ids); - public async Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int maxDeliveryAttempts, CancellationToken token) + public async Task IncrementDeliveryAttempt(IReadOnlyCollection messages, int maxDeliveryAttempts, CancellationToken token) { - if (ids.Count == 0) + if (messages.Count == 0) { return; } @@ -118,14 +121,14 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int ma _sqlTemplate.SqlOutboxMessageIncrementDeliveryAttempt, cmd => { - cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids)); + cmd.Parameters.Add(_outboxMessageAdapter.CreateIdsSqlParameter("@Ids", messages)); cmd.Parameters.AddWithValue("@MaxDeliveryAttempts", maxDeliveryAttempts); }, token: token); - if (affected != ids.Count) + if (affected != messages.Count) { - throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected"); + throw new MessageBusException($"The number of affected rows was {affected}, but {messages.Count} was expected"); } } @@ -185,24 +188,22 @@ private async Task> ReadMessages(SqlCommand c var items = new List(); while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) { - var id = reader.GetGuid(idOrdinal); var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal); - var message = new OutboxMessage - { - Id = id, - Timestamp = reader.GetDateTime(timestampOrdinal), - BusName = reader.GetString(busNameOrdinal), - MessageType = reader.GetString(typeOrdinal), - MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value, - Headers = headers == null ? null : JsonSerializer.Deserialize>(headers, _jsonOptions), - Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal), - InstanceId = reader.GetString(instanceIdOrdinal), - LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal), - LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal), - DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal), - DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal), - DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal) - }; + + var message = _outboxMessageAdapter.Create(reader, idOrdinal); + + message.Timestamp = reader.GetDateTime(timestampOrdinal); + message.BusName = reader.GetString(busNameOrdinal); + message.MessageType = reader.GetString(typeOrdinal); + message.MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value; + message.Headers = headers == null ? null : JsonSerializer.Deserialize>(headers, _jsonOptions); + message.Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal); + message.InstanceId = reader.GetString(instanceIdOrdinal); + message.LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal); + message.LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal); + message.DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal); + message.DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal); + message.DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal); items.Add(message); } diff --git a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs index 40921b8d..91e63f1c 100644 --- a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs +++ b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxForwardingPublishInterceptor.cs @@ -68,15 +68,14 @@ public async Task OnHandle(T message, Func next, IProducerContext context) ?? throw new PublishMessageBusException($"The {busMaster.Name} bus has no configured serializer, so it cannot be used with the outbox plugin"); // Add message to the database, do not call next() - var outboxMessage = new OutboxMessage - { - BusName = busMaster.Name, - Headers = context.Headers, - Path = context.Path, - MessageType = _outboxSettings.MessageTypeResolver.ToName(messageType), - MessagePayload = messagePayload, - InstanceId = _instanceIdProvider.GetInstanceId() - }; + var outboxMessage = _outboxRepository.Create(); + outboxMessage.BusName = busMaster.Name; + outboxMessage.Headers = context.Headers; + outboxMessage.Path = context.Path; + outboxMessage.MessageType = _outboxSettings.MessageTypeResolver.ToName(messageType); + outboxMessage.MessagePayload = messagePayload; + outboxMessage.InstanceId = _instanceIdProvider.GetInstanceId(); + await _outboxRepository.Save(outboxMessage, context.CancellationToken); // a message was sent, notify outbox service to poll on dispose (post transaction) diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs index a8772916..61cc82f4 100644 --- a/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs +++ b/src/SlimMessageBus.Host.Outbox/Repositories/IOutboxRepository.cs @@ -2,11 +2,12 @@ public interface IOutboxRepository { + OutboxMessage Create(); Task Save(OutboxMessage message, CancellationToken token); Task> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token); - Task AbortDelivery (IReadOnlyCollection ids, CancellationToken token); - Task UpdateToSent(IReadOnlyCollection ids, CancellationToken token); - Task IncrementDeliveryAttempt(IReadOnlyCollection ids, int maxDeliveryAttempts, CancellationToken token); + Task AbortDelivery(IReadOnlyCollection messages, CancellationToken token); + Task UpdateToSent(IReadOnlyCollection messages, CancellationToken token); + Task IncrementDeliveryAttempt(IReadOnlyCollection messages, int maxDeliveryAttempts, CancellationToken token); Task DeleteSent(DateTime olderThan, CancellationToken token); Task RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken token); } diff --git a/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs b/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs index 01aedcdb..9f55563d 100644 --- a/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs +++ b/src/SlimMessageBus.Host.Outbox/Repositories/OutboxMessage.cs @@ -2,7 +2,6 @@ public class OutboxMessage { - public Guid Id { get; set; } = Guid.NewGuid(); public DateTime Timestamp { get; set; } = DateTime.UtcNow; public string BusName { get; set; } public string MessageType { get; set; } diff --git a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs index c0b32c62..423c51b8 100644 --- a/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs +++ b/src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs @@ -256,7 +256,7 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR var runAgain = outboxMessages.Count == _outboxSettings.PollBatchSize; var count = 0; - var abortedIds = new List(_outboxSettings.PollBatchSize); + var abortedMessages = new List(_outboxSettings.PollBatchSize); foreach (var busGroup in outboxMessages.GroupBy(x => x.BusName)) { var busName = busGroup.Key; @@ -268,14 +268,14 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR { if (bus == null) { - _logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType, outboxMessage.Path, outboxMessage.BusName); + _logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus. The message will be skipped.", outboxMessage, outboxMessage.MessageType, outboxMessage.Path, outboxMessage.BusName); } else { - _logger.LogWarning("Bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus does not support bulk processing. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType, outboxMessage.Path, outboxMessage.BusName); + _logger.LogWarning("Bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus does not support bulk processing. The message will be skipped.", outboxMessage, outboxMessage.MessageType, outboxMessage.Path, outboxMessage.BusName); } - abortedIds.Add(outboxMessage.Id); + abortedMessages.Add(outboxMessage); } continue; @@ -295,13 +295,13 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR var messageType = _outboxSettings.MessageTypeResolver.ToType(outboxMessage.MessageType); if (messageType == null) { - abortedIds.Add(outboxMessage.Id); - _logger.LogError("Outbox message with Id {Id} - the MessageType {MessageType} is not recognized. The type might have been renamed or moved namespaces.", outboxMessage.Id, outboxMessage.MessageType); + abortedMessages.Add(outboxMessage); + _logger.LogError("Outbox message with Id {Id} - the MessageType {MessageType} is not recognized. The type might have been renamed or moved namespaces.", outboxMessage, outboxMessage.MessageType); return null; } var message = bus.Serializer.Deserialize(messageType, outboxMessage.MessagePayload); - return new OutboxBulkMessage(outboxMessage.Id, message, messageType, outboxMessage.Headers ?? new Dictionary()); + return new OutboxBulkMessage(outboxMessage, message, messageType, outboxMessage.Headers ?? new Dictionary()); }) .Where(x => x != null) .Batch(bulkProducer.MaxMessagesPerTransaction ?? defaultBatchSize); @@ -315,9 +315,9 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR } } - if (abortedIds.Count > 0) + if (abortedMessages.Count > 0) { - await outboxRepository.AbortDelivery(abortedIds, cancellationToken); + await outboxRepository.AbortDelivery(abortedMessages, cancellationToken); } return (runAgain, count); @@ -335,19 +335,19 @@ async internal Task SendMessages(IServiceProvider serviceProvider, IOutboxR return (false, 0); } - var updatedIds = results.Dispatched.Select(x => x.Id).ToHashSet(); - await outboxRepository.UpdateToSent(updatedIds, CancellationToken.None).ConfigureAwait(false); + var updatedMessages = results.Dispatched.Select(x => x.OutboxMessage).ToHashSet(); + await outboxRepository.UpdateToSent(updatedMessages, CancellationToken.None).ConfigureAwait(false); - if (updatedIds.Count != batch.Count) + if (updatedMessages.Count != batch.Count) { - var failedIds = batch.Where(x => !updatedIds.Contains(x.Id!)).Select(x => x.Id).ToHashSet(); - await outboxRepository.IncrementDeliveryAttempt(failedIds, _outboxSettings.MaxDeliveryAttempts, CancellationToken.None).ConfigureAwait(false); + var failedMessages = batch.Where(x => !updatedMessages.Contains(x.OutboxMessage)).Select(x => x.OutboxMessage).ToHashSet(); + await outboxRepository.IncrementDeliveryAttempt(failedMessages, _outboxSettings.MaxDeliveryAttempts, CancellationToken.None).ConfigureAwait(false); - _logger.LogDebug("Failed to publish {MessageCount} messages in a batch of {BatchSize} to pathGroup {Path} on {BusName} bus", failedIds.Count, batch.Count, path, busName); - return (false, updatedIds.Count); + _logger.LogDebug("Failed to publish {MessageCount} messages in a batch of {BatchSize} to pathGroup {Path} on {BusName} bus", failedMessages.Count, batch.Count, path, busName); + return (false, updatedMessages.Count); } - return (true, updatedIds.Count); + return (true, updatedMessages.Count); } private static IMasterMessageBus GetBus(ICompositeMessageBus compositeMessageBus, IMessageBusTarget messageBusTarget, string name) @@ -369,12 +369,12 @@ private static IMasterMessageBus GetBus(ICompositeMessageBus compositeMessageBus public record OutboxBulkMessage : BulkMessageEnvelope { - public Guid Id { get; } + public OutboxMessage OutboxMessage { get; } - public OutboxBulkMessage(Guid id, object message, Type messageType, IDictionary headers) + public OutboxBulkMessage(OutboxMessage outboxMessage, object message, Type messageType, IDictionary headers) : base(message, messageType, headers) { - Id = id; + OutboxMessage = outboxMessage; } } } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs index cb51dc21..0fde5422 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/BaseSqlOutboxRepositoryTest.cs @@ -9,7 +9,8 @@ public class BaseSqlOutboxRepositoryTest : BaseSqlTest protected SqlOutboxSettings _settings; protected SqlOutboxRepository _target; protected SqlOutboxTemplate _template; - protected ISqlTransactionService _transactionService; + protected ISqlTransactionService _transactionService; + protected IOutboxMessageAdapter _outboxMessageAdapter; public override async Task InitializeAsync() { @@ -18,8 +19,9 @@ public override async Task InitializeAsync() _settings = new SqlOutboxSettings(); _connection = new SqlConnection(GetConnectionString()); _transactionService = new SqlTransactionService(_connection, _settings.SqlSettings); - _template = new SqlOutboxTemplate(_settings); - _target = new SqlOutboxRepository(NullLogger.Instance, _settings, _template, _connection, _transactionService); + _template = new SqlOutboxTemplate(_settings); + _outboxMessageAdapter = new GuidOutboxMessageAdapter(_template); + _target = new SqlOutboxRepository(NullLogger.Instance, _settings, _template, _connection, _transactionService, _outboxMessageAdapter); _migrationService = new SqlOutboxMigrationService(NullLogger.Instance, _target, _transactionService, _settings); await _migrationService.Migrate(CancellationToken.None); diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs index 381e1199..0df6bbae 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Sql.Test/SqlOutboxRepositoryTests.cs @@ -27,14 +27,14 @@ public async Task ShouldUpdateStatus() { // arrange var seed = await SeedOutbox(5); - var expected = seed.Select(x => x.Id).Take(3).ToList(); + var expected = seed.Take(3).ToList(); // act await _target.AbortDelivery(expected, CancellationToken.None); var messages = await _target.GetAllMessages(CancellationToken.None); // assert - var actual = messages.Where(x => x.DeliveryAborted).Select(x => x.Id).ToList(); + var actual = messages.Where(x => x.DeliveryAborted).ToList(); actual.Should().BeEquivalentTo(expected); } } @@ -105,15 +105,15 @@ public async Task AbortedMessages_AreNotIncluded() { // arrange var seed = await SeedOutbox(5); - var abortedIds = seed.Select(x => x.Id).Take(3).ToList(); + var aborted = seed.Take(3).ToList(); - await _target.AbortDelivery(abortedIds, CancellationToken.None); + await _target.AbortDelivery(aborted, CancellationToken.None); // act var actual = await _target.LockAndSelect("123", 10, false, TimeSpan.FromMinutes(1), CancellationToken.None); // assert - actual.Select(x => x.Id).Should().NotContain(abortedIds); + actual.Should().NotContain(aborted); } [Fact] @@ -121,15 +121,15 @@ public async Task SentMessages_AreNotIncluded() { // arrange var seed = await SeedOutbox(5); - var sentIds = seed.Select(x => x.Id).Take(3).ToList(); + var sent = seed.Take(3).ToList(); - await _target.UpdateToSent(sentIds, CancellationToken.None); + await _target.UpdateToSent(sent, CancellationToken.None); // act var actual = await _target.LockAndSelect("123", 10, false, TimeSpan.FromMinutes(1), CancellationToken.None); // assert - actual.Select(x => x.Id).Should().NotContain(sentIds); + actual.Should().NotContain(sent); } } @@ -141,17 +141,17 @@ public async Task WithinMaxAttempts_DoesNotAbortDelivery() // arrange const int maxAttempts = 2; var seed = await SeedOutbox(5); - var ids = seed.Select(x => x.Id).Take(3).ToList(); + var messages = seed.Take(3).ToList(); // act - await _target.IncrementDeliveryAttempt(ids, maxAttempts, CancellationToken.None); - var messages = await _target.GetAllMessages(CancellationToken.None); + await _target.IncrementDeliveryAttempt(messages, maxAttempts, CancellationToken.None); + var allMessages = await _target.GetAllMessages(CancellationToken.None); // assert - messages.Should().OnlyContain(x => !x.DeliveryComplete); - messages.Should().OnlyContain(x => !x.DeliveryAborted); - messages.Where(x => !ids.Contains(x.Id)).Should().OnlyContain(x => x.DeliveryAttempt == 0); - messages.Where(x => ids.Contains(x.Id)).Should().OnlyContain(x => x.DeliveryAttempt == 1); + allMessages.Should().OnlyContain(x => !x.DeliveryComplete); + allMessages.Should().OnlyContain(x => !x.DeliveryAborted); + allMessages.Where(x => !messages.Contains(x)).Should().OnlyContain(x => x.DeliveryAttempt == 0); + allMessages.Where(x => messages.Contains(x)).Should().OnlyContain(x => x.DeliveryAttempt == 1); } [Fact] @@ -160,21 +160,21 @@ public async Task BreachingMaxAttempts_AbortsDelivery() // arrange const int maxAttempts = 1; var seed = await SeedOutbox(5); - var ids = seed.Select(x => x.Id).Take(3).ToList(); + var ids = seed.Take(3).ToList(); // act await _target.IncrementDeliveryAttempt(ids, maxAttempts, CancellationToken.None); await _target.IncrementDeliveryAttempt(ids, maxAttempts, CancellationToken.None); - var messages = await _target.GetAllMessages(CancellationToken.None); + var allMessages = await _target.GetAllMessages(CancellationToken.None); // assert - messages.Should().OnlyContain(x => !x.DeliveryComplete); + allMessages.Should().OnlyContain(x => !x.DeliveryComplete); - var attempted = messages.Where(x => ids.Contains(x.Id)).ToList(); + var attempted = allMessages.Where(x => ids.Contains(x)).ToList(); attempted.Should().OnlyContain(x => x.DeliveryAttempt == 2); attempted.Should().OnlyContain(x => x.DeliveryAborted); - var notAttempted = messages.Where(x => !ids.Contains(x.Id)).ToList(); + var notAttempted = allMessages.Where(x => !ids.Contains(x)).ToList(); notAttempted.Should().OnlyContain(x => x.DeliveryAttempt == 0); notAttempted.Should().OnlyContain(x => !x.DeliveryAborted); } @@ -187,14 +187,14 @@ public async Task ShouldUpdateStatus() { // arrange var seed = await SeedOutbox(5); - var expected = seed.Select(x => x.Id).Take(3).ToList(); + var expected = seed.Take(3).ToList(); // act await _target.UpdateToSent(expected, CancellationToken.None); var messages = await _target.GetAllMessages(CancellationToken.None); // assert - var actual = messages.Where(x => x.DeliveryComplete).Select(x => x.Id).ToList(); + var actual = messages.Where(x => x.DeliveryComplete).ToList(); actual.Should().BeEquivalentTo(expected); } } @@ -210,7 +210,6 @@ public async Task WithinLock_ExtendsLockTimeout() await SeedOutbox(batchSize); var lockedItems = await _target.LockAndSelect(instanceId, batchSize, true, TimeSpan.FromSeconds(10), CancellationToken.None); - var lockedIds = lockedItems.Select(x => x.Id).ToList(); var before = await _target.GetAllMessages(CancellationToken.None); var originalLock = before.Min(x => x.LockExpiresOn); @@ -220,7 +219,7 @@ public async Task WithinLock_ExtendsLockTimeout() // assert var after = await _target.GetAllMessages(CancellationToken.None); - var actual = after.Where(x => lockedIds.Contains(x.Id)); + var actual = after.Where(x => lockedItems.Contains(x)); actual.Should().OnlyContain(x => x.LockExpiresOn > originalLock); } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs index aa2a71e2..9a3e7c2e 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.Test/Services/OutboxSendingTaskTests.cs @@ -31,8 +31,8 @@ public async Task DispatchBatch_ShouldReturnSuccess_WhenAllMessagesArePublished( { var batch = new List { - new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), - new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) + new(new OutboxMessage(), "Message1", typeof(string), new Dictionary()), + new(new OutboxMessage(), "Message2", typeof(string), new Dictionary()) }.AsReadOnly(); var results = new ProduceToTransportBulkResult(batch, null); @@ -51,8 +51,8 @@ public async Task DispatchBatch_ShouldReturnFailure_WhenNotAllMessagesArePublish { var batch = new List { - new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), - new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) + new(new OutboxMessage(), "Message1", typeof(string), new Dictionary()), + new(new OutboxMessage(), "Message2", typeof(string), new Dictionary()) }.AsReadOnly(); var results = new ProduceToTransportBulkResult([batch.First()], null); @@ -71,8 +71,8 @@ public async Task DispatchBatch_ShouldIncrementDeliveryAttempts_WhenNotAllMessag { var batch = new List { - new(Guid.NewGuid(), "Message1", typeof(string), new Dictionary()), - new(Guid.NewGuid(), "Message2", typeof(string), new Dictionary()) + new(new OutboxMessage(), "Message1", typeof(string), new Dictionary()), + new(new OutboxMessage(), "Message2", typeof(string), new Dictionary()) }.AsReadOnly(); var results = new ProduceToTransportBulkResult([batch.First()], null); @@ -82,7 +82,7 @@ public async Task DispatchBatch_ShouldIncrementDeliveryAttempts_WhenNotAllMessag await _sut.DispatchBatch(_outboxRepositoryMock.Object, _producerMock.Object, _messageBusTargetMock.Object, batch, "busName", "path", CancellationToken.None); - _outboxRepositoryMock.Verify(x => x.IncrementDeliveryAttempt(It.Is>(ids => ids.Contains(batch[1].Id)), _outboxSettings.MaxDeliveryAttempts, CancellationToken.None), Times.Once); + _outboxRepositoryMock.Verify(x => x.IncrementDeliveryAttempt(It.Is>(messages => messages.Contains(batch[1].OutboxMessage)), _outboxSettings.MaxDeliveryAttempts, CancellationToken.None), Times.Once); } } @@ -139,7 +139,7 @@ public async Task ProcessMessages_ShouldReturnCorrectValues_WhenOutboxMessagesPr // Assert result.RunAgain.Should().BeFalse(); result.Count.Should().Be(30); - _mockOutboxRepository.Verify(x => x.UpdateToSent(It.IsAny>(), It.IsAny()), Times.Exactly(3)); + _mockOutboxRepository.Verify(x => x.UpdateToSent(It.IsAny>(), It.IsAny()), Times.Exactly(3)); } [Fact] @@ -178,8 +178,8 @@ public async Task ProcessMessages_ShouldAbortDelivery_WhenBusIsNotRecognised() outboxMessages[0].BusName = null; outboxMessages[7].BusName = null; - var knownBusCount = outboxMessages.Count(x => x.BusName != null); - + var knownBusCount = outboxMessages.Count(x => x.BusName != null); + _mockMessageBusTarget.SetupGet(x => x.Target).Returns((IMessageBusProducer)null); _mockCompositeMessageBus.Setup(x => x.GetChildBus(It.IsAny())).Returns(_mockMasterMessageBus.Object); @@ -197,8 +197,8 @@ public async Task ProcessMessages_ShouldAbortDelivery_WhenBusIsNotRecognised() var result = await _sut.ProcessMessages(_mockOutboxRepository.Object, outboxMessages, _mockCompositeMessageBus.Object, _mockMessageBusTarget.Object, CancellationToken.None); // Assert - _mockOutboxRepository.Verify(x => x.AbortDelivery(It.IsAny>(), It.IsAny()), Times.Once); - _mockOutboxRepository.Verify(x => x.UpdateToSent(It.IsAny>(), It.IsAny()), Times.Once); + _mockOutboxRepository.Verify(x => x.AbortDelivery(It.IsAny>(), It.IsAny()), Times.Once); + _mockOutboxRepository.Verify(x => x.UpdateToSent(It.IsAny>(), It.IsAny()), Times.Once); result.RunAgain.Should().BeFalse(); result.Count.Should().Be(knownBusCount); } @@ -231,8 +231,8 @@ public async Task ProcessMessages_ShouldAbortDelivery_WhenMessageTypeIsNotRecogn var result = await _sut.ProcessMessages(_mockOutboxRepository.Object, outboxMessages, _mockCompositeMessageBus.Object, _mockMessageBusTarget.Object, CancellationToken.None); // Assert - _mockOutboxRepository.Verify(x => x.AbortDelivery(It.IsAny>(), It.IsAny()), Times.Once); - _mockOutboxRepository.Verify(x => x.UpdateToSent(It.IsAny>(), It.IsAny()), Times.Once); + _mockOutboxRepository.Verify(x => x.AbortDelivery(It.IsAny>(), It.IsAny()), Times.Once); + _mockOutboxRepository.Verify(x => x.UpdateToSent(It.IsAny>(), It.IsAny()), Times.Once); result.RunAgain.Should().BeFalse(); result.Count.Should().Be(knownMessageCount); } @@ -244,7 +244,6 @@ private static List CreateOutboxMessages(int count) .Select( _ => new OutboxMessage { - Id = Guid.NewGuid(), MessageType = "TestType", MessagePayload = [], BusName = "TestBus",