Skip to content

Commit

Permalink
feat: SQS queue processor
Browse files Browse the repository at this point in the history
  • Loading branch information
the-avid-engineer committed Jan 21, 2024
1 parent 6338b6c commit e24368f
Show file tree
Hide file tree
Showing 14 changed files with 501 additions and 5 deletions.
7 changes: 7 additions & 0 deletions EntityDb.sln
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EntityDb.Json", "src\Entity
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EntityDb.Provisioner", "src\EntityDb.Provisioner\EntityDb.Provisioner.csproj", "{26FCDB9D-0DE3-4BB9-858D-3E2C3EF763E3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EntityDb.Aws", "src\EntityDb.Aws\EntityDb.Aws.csproj", "{71EF2C8C-F6BF-4CE3-91A4-62F8C7C9FB8B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -95,6 +97,10 @@ Global
{26FCDB9D-0DE3-4BB9-858D-3E2C3EF763E3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{26FCDB9D-0DE3-4BB9-858D-3E2C3EF763E3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{26FCDB9D-0DE3-4BB9-858D-3E2C3EF763E3}.Release|Any CPU.Build.0 = Release|Any CPU
{71EF2C8C-F6BF-4CE3-91A4-62F8C7C9FB8B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{71EF2C8C-F6BF-4CE3-91A4-62F8C7C9FB8B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{71EF2C8C-F6BF-4CE3-91A4-62F8C7C9FB8B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{71EF2C8C-F6BF-4CE3-91A4-62F8C7C9FB8B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -111,6 +117,7 @@ Global
{FA2AD2E9-84DA-4667-BF46-140B0B050563} = {92484C44-2754-4C1D-BD46-98D83E4020EE}
{4936FFE0-98E5-43A2-89C9-0415A13CAA9B} = {ABACFBCC-B59F-4616-B6CC-99C37AEC8960}
{26FCDB9D-0DE3-4BB9-858D-3E2C3EF763E3} = {ABACFBCC-B59F-4616-B6CC-99C37AEC8960}
{71EF2C8C-F6BF-4CE3-91A4-62F8C7C9FB8B} = {ABACFBCC-B59F-4616-B6CC-99C37AEC8960}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E9D288EE-9351-4018-ABE8-B0968AEB0465}
Expand Down
16 changes: 16 additions & 0 deletions src/EntityDb.Aws/EntityDb.Aws.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageTags>EntityDb EventSourcing EventStreaming DDD CQRS</PackageTags>
<Description>A partial set of implementations of the EntityDb common layer.</Description>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\EntityDb.Common\EntityDb.Common.csproj"/>
</ItemGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.SQS" Version="3.7.300.39" />
</ItemGroup>

</Project>
43 changes: 43 additions & 0 deletions src/EntityDb.Aws/Extensions/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using EntityDb.Aws.Sources.Processors.Queues;
using EntityDb.Common.Sources.Processors.Queues;
using Microsoft.Extensions.DependencyInjection;
using System.Diagnostics.CodeAnalysis;

namespace EntityDb.Aws.Extensions;

/// <summary>
/// Extensions for service collections.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Registers a queue for processing sources as they are committed.
/// For test mode, the queue is not actually a queue and will immediately process the source.
/// For non-test mode, the implementation of ISourceProcessorQueue uses a buffer
/// block to receive messages, enqueue them to sqs, and then background-only service
/// will dequeue them from sqs and process them as normal.
/// </summary>
/// <param name="serviceCollection">The service collection.</param>
/// <param name="testMode">Whether or not to run in test mode.</param>
[ExcludeFromCodeCoverage(Justification = "Tests are only meant to run in test mode.")]
public static void AddSqsSourceProcessorQueue(this IServiceCollection serviceCollection,
bool testMode)
{
if (testMode)
{
serviceCollection.AddSingleton<ISourceProcessorQueue, TestModeSourceProcessorQueue>();
}
else
{
serviceCollection.AddSingleton<SqsOutboxSourceProcessorQueue>();

serviceCollection.AddSingleton<ISourceProcessorQueue>(serviceProvider =>
serviceProvider.GetRequiredService<SqsOutboxSourceProcessorQueue>());

serviceCollection.AddHostedService(serviceProvider =>
serviceProvider.GetRequiredService<SqsOutboxSourceProcessorQueue>());

serviceCollection.AddHostedService<SqsInboxSourceProcessorQueue>();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using Amazon.SQS.Model;
using EntityDb.Abstractions;
using EntityDb.Abstractions.Sources;
using EntityDb.Common.Envelopes;
using EntityDb.Common.Sources;
using EntityDb.Common.Sources.Processors;
using EntityDb.Common.TypeResolvers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;

namespace EntityDb.Aws.Sources.Processors.Queues;

[ExcludeFromCodeCoverage(Justification = "Not used in tests.")]
internal sealed class SqsInboxSourceProcessorQueue : BackgroundService
{
private readonly ILogger<SqsInboxSourceProcessorQueue> _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ITypeResolver _typeResolver;
private readonly SqsSourceProcessorQueueOptions _options;

public SqsInboxSourceProcessorQueue
(
ILogger<SqsInboxSourceProcessorQueue> logger,
IServiceScopeFactory serviceScopeFactory,
ITypeResolver typeResolver,
IOptionsFactory<SqsSourceProcessorQueueOptions> optionsFactory,
string sqsOptionsName
)
{
_logger = logger;
_serviceScopeFactory = serviceScopeFactory;
_typeResolver = typeResolver;
_options = optionsFactory.Create(sqsOptionsName);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
const int maxNumberOfMessages = 1;

var active = true;

while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(active ? _options.ActiveDequeueDelay : _options.IdleDequeueDelay, stoppingToken);

await using var serviceScope = _serviceScopeFactory.CreateAsyncScope();

var (amazonSqs, queueUrl) = await _options.AmazonSqsFactory.Invoke(serviceScope.ServiceProvider);

var receiveMessageResponse = await amazonSqs.ReceiveMessageAsync
(
new ReceiveMessageRequest
{
QueueUrl = queueUrl,
MaxNumberOfMessages = maxNumberOfMessages,
},
stoppingToken
);

if (receiveMessageResponse.Messages.Count != maxNumberOfMessages)
{
active = false;
continue;
}

active = true;

var receivedMessage = receiveMessageResponse.Messages[0];

var deleteRequest = new DeleteMessageRequest
{
ReceiptHandle = receivedMessage.ReceiptHandle,
};

var sqsSourceProcessorMessage =
JsonSerializer.Deserialize<SqsSourceProcessorMessage>(receivedMessage.Body) ??
throw new UnreachableException();

var sourceId = new Id(sqsSourceProcessorMessage.SourceId);
var stateId = new Id(sqsSourceProcessorMessage.StateId);

await using var sourceRepositoryFactory =
serviceScope.ServiceProvider.GetRequiredService<ISourceRepositoryFactory>();

await using var sourceRepository = await sourceRepositoryFactory.Create(sqsSourceProcessorMessage.SourceRepositoryOptionsName,
stoppingToken);

var sourceProcessorTypeName = sqsSourceProcessorMessage.SourceProcessorEnvelopeHeaders.Value[EnvelopeHelper.Type];

using var logScope = _logger.BeginScope(new KeyValuePair<string, object>[]
{
new("QueueUrl", queueUrl),
new("MessageId", receivedMessage.MessageId),
new("ReceiptHandle", receivedMessage.ReceiptHandle),
new("SourceProcessorType", sourceProcessorTypeName),
new("SourceId", sqsSourceProcessorMessage.SourceId),
new("StateId", sqsSourceProcessorMessage.StateId),
});

try
{
var source = await sourceRepository.GetSource(sourceId, stateId, stoppingToken);

var sourceProcessorType = _typeResolver.ResolveType(sqsSourceProcessorMessage.SourceProcessorEnvelopeHeaders);

var sourceProcessor =
(ISourceProcessor)serviceScope.ServiceProvider.GetRequiredService(sourceProcessorType);

_logger.Log(_options.DebugLogLevel, "Started processing source");

await sourceProcessor.Process(source, stoppingToken);

_logger.Log(_options.DebugLogLevel, "Finished processing source");

await amazonSqs.DeleteMessageAsync(deleteRequest, stoppingToken);

_logger.Log(LogLevel.Debug, "Message deleted from queue");
}
catch (Exception exception)
{
_logger.LogError(exception, "Error occurred while processing source");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using Amazon.SQS.Model;
using EntityDb.Common.Envelopes;
using EntityDb.Common.Sources.Processors.Queues;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using System.Threading.Tasks.Dataflow;

namespace EntityDb.Aws.Sources.Processors.Queues;

[ExcludeFromCodeCoverage(Justification = "Not used in tests.")]
internal sealed class SqsOutboxSourceProcessorQueue : BackgroundService, ISourceProcessorQueue
{
private readonly BufferBlock<ISourceProcessorQueueItem> _bufferBlock = new();
private readonly ILogger<SqsOutboxSourceProcessorQueue> _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly SqsSourceProcessorQueueOptions _options;
private CancellationTokenSource? _linkedStoppingTokenSource;

public SqsOutboxSourceProcessorQueue
(
ILogger<SqsOutboxSourceProcessorQueue> logger,
IServiceScopeFactory serviceScopeFactory,
IOptionsFactory<SqsSourceProcessorQueueOptions> optionsFactory,
string optionsName
)
{
_logger = logger;
_serviceScopeFactory = serviceScopeFactory;
_options = optionsFactory.Create(optionsName);
}

public void Enqueue(ISourceProcessorQueueItem item)
{
if (_linkedStoppingTokenSource is { IsCancellationRequested: true })
{
_logger.LogWarning("Application is shutting down when messages are still being received");
}

_bufferBlock.Post(item);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var linkedStoppingTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);

_linkedStoppingTokenSource = linkedStoppingTokenSource;

CancellationToken cancellationToken = default;

while (await _bufferBlock.OutputAvailableAsync(cancellationToken))
{
if (_linkedStoppingTokenSource.IsCancellationRequested)
{
_logger.LogWarning("Application is shutting down when messages are still being enqueued");
}

await Task.Delay(_options.EnqueueDelay, cancellationToken);

var item = await _bufferBlock.ReceiveAsync(cancellationToken);

await using var serviceScope = _serviceScopeFactory.CreateAsyncScope();

var (amazonSqs, queueUrl) = await _options.AmazonSqsFactory.Invoke(serviceScope.ServiceProvider);

using var logScope = _logger.BeginScope(new KeyValuePair<string, object>[]
{
new("QueueUrl", queueUrl),
new("SourceProcessorType", item.SourceProcessorType.Name),
new("SourceId", item.Source.Id.Value),
});

try
{
_logger.Log(_options.DebugLogLevel, "Started enqueueing source");

var sourceProcessorEnvelopeHeaders = EnvelopeHelper.GetEnvelopeHeaders(item.SourceProcessorType);
var sourceProcessorTypeName = sourceProcessorEnvelopeHeaders.Value[EnvelopeHelper.Type];

var sourceId = item.Source.Id.Value;

foreach (var message in item.Source.Messages.DistinctBy(message => message.StatePointer.Id))
{
var stateId = message.StatePointer.Id.Value;

var sendMessageResponse = await amazonSqs.SendMessageAsync
(
new SendMessageRequest
{
QueueUrl = queueUrl,
MessageBody = JsonSerializer.Serialize(new SqsSourceProcessorMessage
{
StateId = stateId,
SourceId = sourceId,
SourceRepositoryOptionsName =
_options.GetSourceRepositoryOptionsNameFromDelta(message.Delta),
SourceProcessorEnvelopeHeaders = sourceProcessorEnvelopeHeaders,
}),
MessageGroupId = $"{sourceProcessorTypeName}:{stateId}",
MessageDeduplicationId = $"{sourceId}",
},
cancellationToken
);

_logger.Log(_options.DebugLogLevel,
"Message {MessageId} enqueued for {SourceId} and {StateId} using {SourceProcessorTypeName}",
sendMessageResponse.MessageId, sourceId, stateId, sourceProcessorTypeName);
}

_logger.Log(_options.DebugLogLevel, "Finished enqueueing source");
}
catch (Exception exception)
{
_logger.LogError(exception, "Error occurred while enqueueing source");
}
}

while (await _bufferBlock.OutputAvailableAsync(cancellationToken))
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using EntityDb.Common.Envelopes;
using System.Diagnostics.CodeAnalysis;

namespace EntityDb.Aws.Sources.Processors.Queues;

[ExcludeFromCodeCoverage(Justification = "Not used in tests.")]
internal record SqsSourceProcessorMessage
{
public required Guid StateId { get; init; }
public required Guid SourceId { get; init; }
public required string SourceRepositoryOptionsName { get; init; }
public required EnvelopeHeaders SourceProcessorEnvelopeHeaders { get; init; }
}
Loading

0 comments on commit e24368f

Please sign in to comment.