Skip to content

Commit

Permalink
Proposal for flexible Outbox PKs
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Sep 30, 2024
1 parent 698a9a2 commit ea65025
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@ public DbContextOutboxRepository(
SqlOutboxSettings settings,
SqlOutboxTemplate sqlOutboxTemplate,
TDbContext dbContext,
ISqlTransactionService transactionService)
: base(logger, settings, sqlOutboxTemplate, (SqlConnection)dbContext.Database.GetDbConnection(), transactionService)
ISqlTransactionService transactionService,
IOutboxMessageAdapter outboxMessageAdapter)
: base(
logger,
settings,
sqlOutboxTemplate,
(SqlConnection)dbContext.Database.GetDbConnection(),
transactionService,
outboxMessageAdapter)
{
DbContext = dbContext;
}
Expand Down
16 changes: 16 additions & 0 deletions src/SlimMessageBus.Host.Outbox.Sql/Adapters/GuidOutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public class GuidOutboxMessage : OutboxMessage, IEquatable<GuidOutboxMessage>
{
public Guid Id { get; set; }

public override string ToString() => Id.ToString();

public override bool Equals(object obj)
=> Equals(obj as GuidOutboxMessage);

public bool Equals(GuidOutboxMessage other)
=> other is not null && Id.Equals(other.Id);

public override int GetHashCode() => Id.GetHashCode();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public class GuidOutboxMessageAdapter(SqlOutboxTemplate sqlOutboxTemplate) : IOutboxMessageAdapter
{
public OutboxMessage Create()
=> new GuidOutboxMessage
{
Id = Guid.NewGuid()
};

public OutboxMessage Create(SqlDataReader reader, int idOrdinal)
=> new GuidOutboxMessage
{
Id = reader.GetGuid(idOrdinal)
};

public SqlParameter CreateIdSqlParameter(string parameterName, OutboxMessage outboxMessage)
{
var guidOutboxMessage = (GuidOutboxMessage)outboxMessage;
return new SqlParameter(parameterName, SqlDbType.UniqueIdentifier) { Value = guidOutboxMessage.Id };
}

public SqlParameter CreateIdsSqlParameter(string parameterName, IEnumerable<OutboxMessage> outboxMessages)
{
var guidIds = outboxMessages.Cast<GuidOutboxMessage>().Select(x => x.Id);
var idsString = string.Join(sqlOutboxTemplate.InIdsSeparator, guidIds);
return new SqlParameter(parameterName, SqlDbType.NVarChar) { Value = guidIds };
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace SlimMessageBus.Host.Outbox.Sql;

/// <summary>
/// Allows to customize the outbox message creation and handling (PK type, etc).
/// </summary>
public interface IOutboxMessageAdapter
{
OutboxMessage Create();
OutboxMessage Create(SqlDataReader reader, int idOrdinal);
SqlParameter CreateIdSqlParameter(string parameterName, OutboxMessage outboxMessage);
SqlParameter CreateIdsSqlParameter(string parameterName, IEnumerable<OutboxMessage> outboxMessages);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public static MessageBusBuilder AddOutboxUsingSql<TOutboxRepository>(this Messag
services.TryAddScoped<TOutboxRepository>();
services.TryAddScoped<ISqlTransactionService, SqlTransactionService>();
services.TryAddSingleton<IOutboxMessageAdapter, GuidOutboxMessageAdapter>();
services.Replace(ServiceDescriptor.Scoped<ISqlOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
services.Replace(ServiceDescriptor.Scoped<IOutboxRepository>(svp => svp.GetRequiredService<TOutboxRepository>()));
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Outbox.Sql/ISqlOutboxRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

public interface ISqlOutboxRepository : IOutboxRepository
{
}
}
71 changes: 36 additions & 35 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
namespace SlimMessageBus.Host.Outbox.Sql;

public class SqlOutboxRepository : CommonSqlRepository, ISqlOutboxRepository
{
private readonly SqlOutboxTemplate _sqlTemplate;
private readonly IOutboxMessageAdapter _outboxMessageAdapter;
private readonly JsonSerializerOptions _jsonOptions;

protected SqlOutboxSettings Settings { get; }

public SqlOutboxRepository(ILogger<SqlOutboxRepository> logger, SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, SqlConnection connection, ISqlTransactionService transactionService)
public SqlOutboxRepository(ILogger<SqlOutboxRepository> logger, SqlOutboxSettings settings, SqlOutboxTemplate sqlOutboxTemplate, SqlConnection connection, ISqlTransactionService transactionService, IOutboxMessageAdapter outboxMessageAdapter)
: base(logger, settings.SqlSettings, connection, transactionService)
{
_sqlTemplate = sqlOutboxTemplate;
_outboxMessageAdapter = outboxMessageAdapter;
_jsonOptions = new();
_jsonOptions.Converters.Add(new ObjectToInferredTypesConverter());

Settings = settings;
}

public OutboxMessage Create() => _outboxMessageAdapter.Create();

public async virtual Task Save(OutboxMessage message, CancellationToken token)
{
await EnsureConnection();

await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutboxMessageInsert, cmd =>
{
cmd.Parameters.Add("@Id", SqlDbType.UniqueIdentifier).Value = message.Id;
cmd.Parameters.Add(_outboxMessageAdapter.CreateIdSqlParameter("@Id", message));
cmd.Parameters.Add("@Timestamp", SqlDbType.DateTime2).Value = message.Timestamp;
cmd.Parameters.Add("@BusName", SqlDbType.NVarChar).Value = message.BusName;
cmd.Parameters.Add("@MessageType", SqlDbType.NVarChar).Value = message.MessageType;
Expand Down Expand Up @@ -52,9 +55,9 @@ public async Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string insta
return await ReadMessages(cmd, token).ConfigureAwait(false);
}

public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken token)
public async Task AbortDelivery(IReadOnlyCollection<OutboxMessage> messages, CancellationToken token)
{
if (ids.Count == 0)
if (messages.Count == 0)
{
return;
}
Expand All @@ -65,19 +68,19 @@ public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken
_sqlTemplate.SqlOutboxMessageAbortDelivery,
cmd =>
{
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
cmd.Parameters.Add(_outboxMessageAdapter.CreateIdsSqlParameter("@Ids", messages));
},
token: token);

if (affected != ids.Count)
if (affected != messages.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
throw new MessageBusException($"The number of affected rows was {affected}, but {messages.Count} was expected");
}
}

public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken token)
public async Task UpdateToSent(IReadOnlyCollection<OutboxMessage> messages, CancellationToken token)
{
if (ids.Count == 0)
if (messages.Count == 0)
{
return;
}
Expand All @@ -88,21 +91,21 @@ public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken
_sqlTemplate.SqlOutboxMessageUpdateSent,
cmd =>
{
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
cmd.Parameters.Add(_outboxMessageAdapter.CreateIdsSqlParameter("@Ids", messages));
},
token: token);

if (affected != ids.Count)
if (affected != messages.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
throw new MessageBusException($"The number of affected rows was {affected}, but {messages.Count} was expected");
}
}

private string ToIdsString(IReadOnlyCollection<Guid> ids) => string.Join(_sqlTemplate.InIdsSeparator, ids);

public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken token)
public async Task IncrementDeliveryAttempt(IReadOnlyCollection<OutboxMessage> messages, int maxDeliveryAttempts, CancellationToken token)
{
if (ids.Count == 0)
if (messages.Count == 0)
{
return;
}
Expand All @@ -118,14 +121,14 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int ma
_sqlTemplate.SqlOutboxMessageIncrementDeliveryAttempt,
cmd =>
{
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
cmd.Parameters.Add(_outboxMessageAdapter.CreateIdsSqlParameter("@Ids", messages));
cmd.Parameters.AddWithValue("@MaxDeliveryAttempts", maxDeliveryAttempts);
},
token: token);

if (affected != ids.Count)
if (affected != messages.Count)
{
throw new MessageBusException($"The number of affected rows was {affected}, but {ids.Count} was expected");
throw new MessageBusException($"The number of affected rows was {affected}, but {messages.Count} was expected");
}
}

Expand Down Expand Up @@ -185,24 +188,22 @@ private async Task<IReadOnlyCollection<OutboxMessage>> ReadMessages(SqlCommand c
var items = new List<OutboxMessage>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var id = reader.GetGuid(idOrdinal);
var headers = reader.IsDBNull(headersOrdinal) ? null : reader.GetString(headersOrdinal);
var message = new OutboxMessage
{
Id = id,
Timestamp = reader.GetDateTime(timestampOrdinal),
BusName = reader.GetString(busNameOrdinal),
MessageType = reader.GetString(typeOrdinal),
MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value,
Headers = headers == null ? null : JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions),
Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal),
InstanceId = reader.GetString(instanceIdOrdinal),
LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal),
LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal),
DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal),
DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal),
DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal)
};

var message = _outboxMessageAdapter.Create(reader, idOrdinal);

message.Timestamp = reader.GetDateTime(timestampOrdinal);
message.BusName = reader.GetString(busNameOrdinal);
message.MessageType = reader.GetString(typeOrdinal);
message.MessagePayload = reader.GetSqlBinary(payloadOrdinal).Value;
message.Headers = headers == null ? null : JsonSerializer.Deserialize<IDictionary<string, object>>(headers, _jsonOptions);
message.Path = reader.IsDBNull(pathOrdinal) ? null : reader.GetString(pathOrdinal);
message.InstanceId = reader.GetString(instanceIdOrdinal);
message.LockInstanceId = reader.IsDBNull(lockInstanceIdOrdinal) ? null : reader.GetString(lockInstanceIdOrdinal);
message.LockExpiresOn = reader.IsDBNull(lockExpiresOnOrdinal) ? null : reader.GetDateTime(lockExpiresOnOrdinal);
message.DeliveryAttempt = reader.GetInt32(deliveryAttemptOrdinal);
message.DeliveryComplete = reader.GetBoolean(deliveryCompleteOrdinal);
message.DeliveryAborted = reader.GetBoolean(deliveryAbortedOrdinal);

items.Add(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,14 @@ public async Task OnHandle(T message, Func<Task> next, IProducerContext context)
?? throw new PublishMessageBusException($"The {busMaster.Name} bus has no configured serializer, so it cannot be used with the outbox plugin");

// Add message to the database, do not call next()
var outboxMessage = new OutboxMessage
{
BusName = busMaster.Name,
Headers = context.Headers,
Path = context.Path,
MessageType = _outboxSettings.MessageTypeResolver.ToName(messageType),
MessagePayload = messagePayload,
InstanceId = _instanceIdProvider.GetInstanceId()
};
var outboxMessage = _outboxRepository.Create();
outboxMessage.BusName = busMaster.Name;
outboxMessage.Headers = context.Headers;
outboxMessage.Path = context.Path;
outboxMessage.MessageType = _outboxSettings.MessageTypeResolver.ToName(messageType);
outboxMessage.MessagePayload = messagePayload;
outboxMessage.InstanceId = _instanceIdProvider.GetInstanceId();

await _outboxRepository.Save(outboxMessage, context.CancellationToken);

// a message was sent, notify outbox service to poll on dispose (post transaction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

public interface IOutboxRepository
{
OutboxMessage Create();
Task Save(OutboxMessage message, CancellationToken token);
Task<IReadOnlyCollection<OutboxMessage>> LockAndSelect(string instanceId, int batchSize, bool tableLock, TimeSpan lockDuration, CancellationToken token);
Task AbortDelivery (IReadOnlyCollection<Guid> ids, CancellationToken token);
Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken token);
Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken token);
Task AbortDelivery(IReadOnlyCollection<OutboxMessage> messages, CancellationToken token);
Task UpdateToSent(IReadOnlyCollection<OutboxMessage> messages, CancellationToken token);
Task IncrementDeliveryAttempt(IReadOnlyCollection<OutboxMessage> messages, int maxDeliveryAttempts, CancellationToken token);
Task DeleteSent(DateTime olderThan, CancellationToken token);
Task<bool> RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

public class OutboxMessage
{
public Guid Id { get; set; } = Guid.NewGuid();
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
public string BusName { get; set; }
public string MessageType { get; set; }
Expand Down
Loading

0 comments on commit ea65025

Please sign in to comment.