Skip to content

Commit

Permalink
Fix Azure Service Bus module DI issues (#5975)
Browse files Browse the repository at this point in the history
* Switch to Azure Service Bus for message brokering

Updated the message broker from in-memory to Azure Service Bus to enhance scalability. Added new configuration settings and dependencies to support Azure Service Bus integration, including scoped service initialization and queue/topic management.

* Refactor StartWorkers for dependency injection

Updated StartWorkers class to use constructor parameter injection and service scope factory for retrieving services. This refactor eliminates the need for field-level dependency storage, enhancing testability and adherence to the dependency injection principles. Additionally, annotated the class with [UsedImplicitly] to eliminate unused code warnings.

* Refactor Worker to use IServiceScopeFactory for dependency resolution

Refactored the Worker class to use IServiceScopeFactory for DI instead of directly injecting IWorkflowInbox. This enhances the flexibility and lifecycle management of the dependencies.

* Add Azure Service Bus integration toggle

Introduced a new configuration constant `useAzureServiceBus` to control the optional integration with Azure Service Bus. Implemented conditional logic to bind Azure Service Bus options from configuration if the flag is enabled.

* Add IComposite interface and implement Setup method

Introduced a new IComposite interface and added a default Setup method to the Composite class. Updated the ActivityFactory to call the Setup method when an activity is an instance of IComposite. The CompositeExample class demonstrates the use of the new interface and method.

* Remove AzureServiceBus configuration setup

The AzureServiceBus configuration setup was removed from the Program.cs file. This change aims to streamline configuration and remove unused or unnecessary setup, ensuring the code remains clean and maintainable.

* Switch MassTransit broker to in-memory

Changed the MassTransit broker from AzureServiceBus to Memory in the application configuration. This adjustment aims to simplify deployment and reduce dependencies in the current environment.
  • Loading branch information
sfmskywalker authored Sep 19, 2024
1 parent 7c31332 commit a827ab2
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 32 deletions.
40 changes: 40 additions & 0 deletions src/bundles/Elsa.Server.Web/Activities/CompositeExample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using Elsa.AzureServiceBus.Activities;
using Elsa.Extensions;
using Elsa.Workflows.Activities;
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Models;

namespace Elsa.Server.Web.Activities;

[Activity("Elsa", "Example")]
public class CompositeExample : Composite
{
/// <summary>
/// The name of the queue or topic to read from.
/// </summary>
[Input(Description = "The name of the queue or topic to read from.")]
public Input<string> QueueOrTopic { get; set; } = default!;

private MessageReceived _messageReceived = default!;

/// <inheritdoc />
public override void Setup()
{
_messageReceived = new MessageReceived
{
QueueOrTopic = QueueOrTopic,
CanStartWorkflow = true
};

var writeLine = new WriteLine("Hello World!");

Root = new Sequence
{
Activities =
{
_messageReceived,
writeLine
}
};
}
}
1 change: 1 addition & 0 deletions src/bundles/Elsa.Server.Web/Elsa.Server.Web.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\modules\Elsa.AzureServiceBus\Elsa.AzureServiceBus.csproj" />
<ProjectReference Include="..\..\modules\Elsa.Caching.Distributed.MassTransit\Elsa.Caching.Distributed.MassTransit.csproj"/>
<ProjectReference Include="..\..\modules\Elsa.EntityFrameworkCore.PostgreSql\Elsa.EntityFrameworkCore.PostgreSql.csproj"/>
<ProjectReference Include="..\..\modules\Elsa.MassTransit.AzureServiceBus\Elsa.MassTransit.AzureServiceBus.csproj"/>
Expand Down
4 changes: 4 additions & 0 deletions src/bundles/Elsa.Server.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
const bool useCaching = true;
const bool useReadOnlyMode = false;
const bool useSignalR = true;
const bool useAzureServiceBus = false;
const DistributedCachingTransport distributedCachingTransport = DistributedCachingTransport.MassTransit;
const MassTransitBroker useMassTransitBroker = MassTransitBroker.Memory;

Expand Down Expand Up @@ -327,6 +328,9 @@
elsa.UseRealTimeWorkflows();
}
if (useAzureServiceBus)
elsa.UseAzureServiceBus(asb => asb.AzureServiceBusOptions += options => configuration.GetSection("AzureServiceBus").Bind(options));
if (useMassTransit)
{
elsa.UseMassTransit(massTransit =>
Expand Down
13 changes: 13 additions & 0 deletions src/bundles/Elsa.Server.Web/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,18 @@
"def say_hello_world(): return greet('World')"
]
}
},
"AzureServiceBus": {
"ConnectionStringOrName": "AzureServiceBus",
"Queues": [
{
"name": "order-created"
},
{
"name": "order-completed"
}
],
"Topics": [],
"Subscriptions": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public override void Apply()
.AddSingleton(ServiceBusClientFactory)
.AddSingleton<ConfigurationQueueTopicAndSubscriptionProvider>()
.AddSingleton<IWorkerManager, WorkerManager>()
.AddTransient<IServiceBusInitializer, ServiceBusInitializer>();
.AddScoped<IServiceBusInitializer, ServiceBusInitializer>();

// Definition providers.
Services
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
using Elsa.AzureServiceBus.Contracts;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Elsa.AzureServiceBus.HostedServices;

/// <summary>
/// A blocking hosted service that creates queues, topics and subscriptions.
/// </summary>
public class CreateQueuesTopicsAndSubscriptions : IHostedService
[UsedImplicitly]
public class CreateQueuesTopicsAndSubscriptions(IServiceScopeFactory scopeFactory) : IHostedService
{
private readonly IServiceBusInitializer _serviceBusInitializer;
/// <summary>
/// Constructor.
/// </summary>
public CreateQueuesTopicsAndSubscriptions(IServiceBusInitializer serviceBusInitializer) => _serviceBusInitializer = serviceBusInitializer;

/// <inheritdoc />
public Task StartAsync(CancellationToken cancellationToken) => _serviceBusInitializer.InitializeAsync(cancellationToken);
public async Task StartAsync(CancellationToken cancellationToken)
{
await using var scope = scopeFactory.CreateAsyncScope();
var initializer = scope.ServiceProvider.GetRequiredService<IServiceBusInitializer>();
await initializer.InitializeAsync(cancellationToken);
}

/// <inheritdoc />
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
Expand Down
28 changes: 10 additions & 18 deletions src/modules/Elsa.AzureServiceBus/HostedServices/StartWorkers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,29 @@
using Elsa.Workflows.Helpers;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Filters;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Elsa.AzureServiceBus.HostedServices;

/// <summary>
/// Creates workers for each trigger &amp; bookmark in response to updated workflow trigger indexes and bookmarks.
/// </summary>
public class StartWorkers : IHostedService
[UsedImplicitly]
public class StartWorkers(IWorkerManager workerManager, IServiceScopeFactory scopeFactory) : IHostedService
{
private readonly ITriggerStore _triggerStore;
private readonly IBookmarkStore _bookmarkStore;
private readonly IWorkerManager _workerManager;

/// <summary>
/// Constructor.
/// </summary>
public StartWorkers(ITriggerStore triggerStore, IBookmarkStore bookmarkStore, IWorkerManager workerManager)
{
_triggerStore = triggerStore;
_bookmarkStore = bookmarkStore;
_workerManager = workerManager;
}

/// <inheritdoc />
public async Task StartAsync(CancellationToken cancellationToken)
{
using var scope = scopeFactory.CreateScope();
var triggerStore = scope.ServiceProvider.GetRequiredService<ITriggerStore>();
var bookmarkStore = scope.ServiceProvider.GetRequiredService<IBookmarkStore>();
var activityType = ActivityTypeNameHelper.GenerateTypeName<MessageReceived>();
var triggerFilter = new TriggerFilter { Name = activityType};
var triggers = (await _triggerStore.FindManyAsync(triggerFilter, cancellationToken)).Select(x => x.GetPayload<MessageReceivedTriggerPayload>()).ToList();
var triggers = (await triggerStore.FindManyAsync(triggerFilter, cancellationToken)).Select(x => x.GetPayload<MessageReceivedTriggerPayload>()).ToList();
var bookmarkFilter = new BookmarkFilter { ActivityTypeName = activityType };
var bookmarks = (await _bookmarkStore.FindManyAsync(bookmarkFilter, cancellationToken)).Select(x => x.GetPayload<MessageReceivedTriggerPayload>()).ToList();
var bookmarks = (await bookmarkStore.FindManyAsync(bookmarkFilter, cancellationToken)).Select(x => x.GetPayload<MessageReceivedTriggerPayload>()).ToList();
var payloads = triggers.Concat(bookmarks).ToList();

await EnsureWorkersAsync(payloads, cancellationToken);
Expand All @@ -46,6 +38,6 @@ public async Task StartAsync(CancellationToken cancellationToken)

private async Task EnsureWorkersAsync(IEnumerable<MessageReceivedTriggerPayload> payloads, CancellationToken cancellationToken)
{
foreach (var payload in payloads) await _workerManager.EnsureWorkerAsync(payload.QueueOrTopic, payload.Subscription, cancellationToken);
foreach (var payload in payloads) await workerManager.EnsureWorkerAsync(payload.QueueOrTopic, payload.Subscription, cancellationToken);
}
}
11 changes: 7 additions & 4 deletions src/modules/Elsa.AzureServiceBus/Services/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Elsa.Workflows.Helpers;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Elsa.AzureServiceBus.Services;
Expand All @@ -15,18 +16,18 @@ namespace Elsa.AzureServiceBus.Services;
public class Worker : IAsyncDisposable
{
private readonly ServiceBusProcessor _processor;
private readonly IWorkflowInbox _workflowInbox;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger _logger;
private int _refCount = 1;

/// <summary>
/// Initializes a new instance of the <see cref="Worker"/> class.
/// </summary>
public Worker(string queueOrTopic, string? subscription, ServiceBusClient client, IWorkflowInbox workflowInbox, ILogger<Worker> logger)
public Worker(string queueOrTopic, string? subscription, ServiceBusClient client, IServiceScopeFactory scopeFactory, ILogger<Worker> logger)
{
QueueOrTopic = queueOrTopic;
Subscription = subscription == "" ? default : subscription;
_workflowInbox = workflowInbox;
_scopeFactory = scopeFactory;
_logger = logger;

var options = new ServiceBusProcessorOptions();
Expand Down Expand Up @@ -98,8 +99,10 @@ private async Task InvokeWorkflowsAsync(ServiceBusReceivedMessage message, Cance
var messageModel = CreateMessageModel(message);
var input = new Dictionary<string, object> { [MessageReceived.InputKey] = messageModel };
var activityTypeName = ActivityTypeNameHelper.GenerateTypeName<MessageReceived>();
await using var scope = _scopeFactory.CreateAsyncScope();
var workflowInbox = scope.ServiceProvider.GetRequiredService<IWorkflowInbox>();

var results = await _workflowInbox.SubmitAsync(new NewWorkflowInboxMessage
var results = await workflowInbox.SubmitAsync(new NewWorkflowInboxMessage
{
ActivityTypeName = activityTypeName,
BookmarkPayload = payload,
Expand Down
6 changes: 5 additions & 1 deletion src/modules/Elsa.Workflows.Core/Activities/Composite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace Elsa.Workflows.Activities;
/// Represents a composite activity that has a single <see cref="Root"/> activity. Like a workflow, but without workflow-level properties.
/// </summary>
[PublicAPI]
public abstract class Composite : Activity, IVariableContainer
public abstract class Composite : Activity, IVariableContainer, IComposite
{
/// <inheritdoc />
protected Composite(string? source = default, int? line = default) : base(source, line)
Expand Down Expand Up @@ -172,6 +172,10 @@ private async ValueTask OnCompleteCompositeSignal(CompleteCompositeSignal signal
/// Creates a new <see cref="Activities.SetVariable"/> activity.
/// </summary>
protected static SetVariable<T> SetVariable<T>(Variable<T> variable, Variable<T> value, [CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) => new(variable, value, source, line);

public virtual void Setup()
{
}
}

/// <summary>
Expand Down
6 changes: 6 additions & 0 deletions src/modules/Elsa.Workflows.Core/Contracts/IComposite.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Elsa.Workflows.Contracts;

public interface IComposite
{
void Setup();
}
3 changes: 3 additions & 0 deletions src/modules/Elsa.Workflows.Core/Services/ActivityFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public IActivity Create(Type type, ActivityConstructorContext context)
var activityElement = context.Element;
var activityDescriptor = context.ActivityDescriptor;
var activity = (IActivity)context.Element.Deserialize(type, context.SerializerOptions)!;
var composite = activity as IComposite;

composite?.Setup();

ReadSyntheticInputs(activityDescriptor, activity, activityElement, context.SerializerOptions);
ReadSyntheticOutputs(activityDescriptor, activity, activityElement);
Expand Down

0 comments on commit a827ab2

Please sign in to comment.