Skip to content

Commit

Permalink
Add synchronous serialization methods and mark async as obsolete
Browse files Browse the repository at this point in the history
Implemented synchronous methods for serialization and deserialization while marking the asynchronous methods as obsolete across various storage and serialization services. This includes updates to handle serialization within the Save and Load methods, enhancing performance by avoiding unnecessary Task usage.
  • Loading branch information
sfmskywalker committed Sep 27, 2024
1 parent 7609b0e commit 2e1ed99
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 79 deletions.
41 changes: 41 additions & 0 deletions src/bundles/Elsa.Server.Web/SampleWorkflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Elsa.Expressions.Models;
using Elsa.Extensions;
using Elsa.Scheduling.Activities;
using Elsa.Workflows;
using Elsa.Workflows.Activities;
using Elsa.Workflows.Contracts;

namespace Elsa.Server.Web;

public class SampleWorkflow : WorkflowBase
{
protected override void Build(IWorkflowBuilder workflow)
{
// The WithVariable method ensures that the created variable will be added to the Workflow's Variables collection, which is required for persistent variables.
var variable1 = workflow.WithVariable<string>("Foo").WithWorkflowStorage();

workflow.Variables =
[
variable1
];

workflow.Root = new Sequence
{
Activities =
{
new StartAt(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(5))
{
CanStartWorkflow = true
},
new WriteLine(variable1),
new SetVariable
{
Variable = variable1,
Value = new (Literal.From("Bar"))
},
new Delay(TimeSpan.FromSeconds(1)),
new WriteLine(variable1)
}
};
}
}
2 changes: 1 addition & 1 deletion src/bundles/Elsa.Server.Web/appsettings.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Default": "Warning",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information",
"OpenTelemetry": "Debug"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,36 @@ public DapperActivityExecutionRecordStore(IDbConnectionProvider dbConnectionProv
/// <inheritdoc />
public async Task SaveAsync(ActivityExecutionRecord record, CancellationToken cancellationToken = default)
{
var mappedRecord = await Map(record, cancellationToken);
var mappedRecord = Map(record, cancellationToken);
await _store.SaveAsync(mappedRecord, PrimaryKeyName, cancellationToken);
}

/// <inheritdoc />
public async Task SaveManyAsync(IEnumerable<ActivityExecutionRecord> records, CancellationToken cancellationToken = default)
{
var mappedRecords = await Task.WhenAll(records.Select(async x => await Map(x, cancellationToken)));
var mappedRecords = records.Select(x => Map(x, cancellationToken));
await _store.SaveManyAsync(mappedRecords, PrimaryKeyName, cancellationToken);
}

/// <inheritdoc />
public async Task<ActivityExecutionRecord?> FindAsync(ActivityExecutionRecordFilter filter, CancellationToken cancellationToken = default)
{
var record = await _store.FindAsync(q => ApplyFilter(q, filter), cancellationToken);
return record == null ? null : await MapAsync(record, cancellationToken);
return record == null ? null : Map(record, cancellationToken);
}

/// <inheritdoc />
public async Task<IEnumerable<ActivityExecutionRecord>> FindManyAsync<TOrderBy>(ActivityExecutionRecordFilter filter, ActivityExecutionRecordOrder<TOrderBy> order, CancellationToken cancellationToken = default)
{
var records = await _store.FindManyAsync(q => ApplyFilter(q, filter), order.KeySelector.GetPropertyName(), order.Direction, cancellationToken);
return await Task.WhenAll(records.Select(async x => await MapAsync(x, cancellationToken)));
return records.Select( x => Map(x, cancellationToken)).ToList();
}

/// <inheritdoc />
public async Task<IEnumerable<ActivityExecutionRecord>> FindManyAsync(ActivityExecutionRecordFilter filter, CancellationToken cancellationToken = default)
{
var records = await _store.FindManyAsync(q => ApplyFilter(q, filter), cancellationToken);
return await Task.WhenAll(records.Select(async x => await MapAsync(x, cancellationToken)));
return records.Select( x => Map(x, cancellationToken)).ToList();
}

/// <inheritdoc />
Expand Down Expand Up @@ -115,7 +115,7 @@ private static void ApplyFilter(ParameterizedQuery query, ActivityExecutionRecor
}
}

private async ValueTask<ActivityExecutionRecordRecord> Map(ActivityExecutionRecord source, CancellationToken cancellationToken)
private ActivityExecutionRecordRecord Map(ActivityExecutionRecord source, CancellationToken cancellationToken)
{
return new ActivityExecutionRecordRecord
{
Expand All @@ -130,15 +130,15 @@ private async ValueTask<ActivityExecutionRecordRecord> Map(ActivityExecutionReco
HasBookmarks = source.HasBookmarks,
Status = source.Status.ToString(),
ActivityTypeVersion = source.ActivityTypeVersion,
SerializedActivityState = source.ActivityState != null ? await _safeSerializer.SerializeAsync(source.ActivityState, cancellationToken) : null,
SerializedPayload = source.Payload != null ? await _safeSerializer.SerializeAsync(source.Payload, cancellationToken) : null,
SerializedOutputs = source.Outputs?.Any() == true ? await _safeSerializer.SerializeAsync(source.Outputs, cancellationToken) : null,
SerializedActivityState = source.ActivityState != null ? _safeSerializer.Serialize(source.ActivityState, cancellationToken) : null,
SerializedPayload = source.Payload != null ? _safeSerializer.Serialize(source.Payload, cancellationToken) : null,
SerializedOutputs = source.Outputs?.Any() == true ? _safeSerializer.Serialize(source.Outputs, cancellationToken) : null,
SerializedException = source.Exception != null ? _payloadSerializer.Serialize(source.Exception) : null,
SerializedProperties = source.Properties.Any() ? await _safeSerializer.SerializeAsync(source.Properties, cancellationToken) : null
SerializedProperties = source.Properties.Any() ? _safeSerializer.Serialize(source.Properties, cancellationToken) : null
};
}

private async ValueTask<ActivityExecutionRecord> MapAsync(ActivityExecutionRecordRecord source, CancellationToken cancellationToken)
private ActivityExecutionRecord Map(ActivityExecutionRecordRecord source, CancellationToken cancellationToken)
{
return new ActivityExecutionRecord
{
Expand All @@ -153,11 +153,11 @@ private async ValueTask<ActivityExecutionRecord> MapAsync(ActivityExecutionRecor
HasBookmarks = source.HasBookmarks,
Status = Enum.Parse<ActivityStatus>(source.Status),
ActivityTypeVersion = source.ActivityTypeVersion,
ActivityState = source.SerializedActivityState != null ? _payloadSerializer.Deserialize<IDictionary<string, object>>(source.SerializedActivityState) : default,
Payload = source.SerializedPayload != null ? await _safeSerializer.DeserializeAsync<IDictionary<string, object>>(source.SerializedPayload, cancellationToken) : default,
Outputs = source.SerializedOutputs != null ? await _safeSerializer.DeserializeAsync<IDictionary<string, object?>>(source.SerializedOutputs, cancellationToken) : default,
Exception = source.SerializedException != null ? _payloadSerializer.Deserialize<ExceptionState>(source.SerializedException) : default,
Properties = source.SerializedProperties != null ? await _safeSerializer.DeserializeAsync<IDictionary<string, object>>(source.SerializedProperties, cancellationToken) : default
ActivityState = source.SerializedActivityState != null ? _payloadSerializer.Deserialize<IDictionary<string, object>>(source.SerializedActivityState) : null,
Payload = source.SerializedPayload != null ? _safeSerializer.Deserialize<IDictionary<string, object>>(source.SerializedPayload, cancellationToken) : null,
Outputs = source.SerializedOutputs != null ? _safeSerializer.Deserialize<IDictionary<string, object?>>(source.SerializedOutputs, cancellationToken) : null,
Exception = source.SerializedException != null ? _payloadSerializer.Deserialize<ExceptionState>(source.SerializedException) : null,
Properties = source.SerializedProperties != null ? _safeSerializer.Deserialize<IDictionary<string, object>>(source.SerializedProperties, cancellationToken) : null
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ private async ValueTask OnSaveAsync(RuntimeElsaDbContext dbContext, ActivityExec
{
entity = entity.SanitizeLogMessage();
var compressionAlgorithm = options.Value.CompressionAlgorithm ?? nameof(None);
var serializedActivityState = entity.ActivityState != null ? await safeSerializer.SerializeAsync(entity.ActivityState, cancellationToken) : null;
var serializedActivityState = entity.ActivityState != null ? safeSerializer.Serialize(entity.ActivityState, cancellationToken) : null;
var compressedSerializedActivityState = serializedActivityState != null ? await compressionCodecResolver.Resolve(compressionAlgorithm).CompressAsync(serializedActivityState, cancellationToken) : null;

dbContext.Entry(entity).Property("SerializedActivityState").CurrentValue = compressedSerializedActivityState;
dbContext.Entry(entity).Property("SerializedActivityStateCompressionAlgorithm").CurrentValue = compressionAlgorithm;
dbContext.Entry(entity).Property("SerializedOutputs").CurrentValue = entity.Outputs?.Any() == true ? await safeSerializer.SerializeAsync(entity.Outputs, cancellationToken) : null;
dbContext.Entry(entity).Property("SerializedOutputs").CurrentValue = entity.Outputs?.Any() == true ? safeSerializer.Serialize(entity.Outputs, cancellationToken) : null;
dbContext.Entry(entity).Property("SerializedProperties").CurrentValue = entity.Properties.Any() ? payloadSerializer.Serialize(entity.Properties) : null;
dbContext.Entry(entity).Property("SerializedException").CurrentValue = entity.Exception != null ? payloadSerializer.Serialize(entity.Exception) : null;
dbContext.Entry(entity).Property("SerializedPayload").CurrentValue = entity.Payload?.Any() == true ? payloadSerializer.Serialize(entity.Payload) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public async Task<long> DeleteManyAsync(WorkflowExecutionLogRecordFilter filter,
private async ValueTask OnSaveAsync(RuntimeElsaDbContext dbContext, WorkflowExecutionLogRecord entity, CancellationToken cancellationToken)
{
entity = entity.SanitizeLogMessage();
dbContext.Entry(entity).Property("SerializedActivityState").CurrentValue = entity.ActivityState?.Any() == true ? await _safeSerializer.SerializeAsync(entity.ActivityState, cancellationToken) : default;
dbContext.Entry(entity).Property("SerializedPayload").CurrentValue = entity.Payload != null ? await _safeSerializer.SerializeAsync(entity.Payload, cancellationToken) : default;
dbContext.Entry(entity).Property("SerializedActivityState").CurrentValue = entity.ActivityState?.Any() == true ? _safeSerializer.Serialize(entity.ActivityState, cancellationToken) : null;
dbContext.Entry(entity).Property("SerializedPayload").CurrentValue = entity.Payload != null ? _safeSerializer.Serialize(entity.Payload, cancellationToken) : null;
}

private async ValueTask OnLoadAsync(RuntimeElsaDbContext dbContext, WorkflowExecutionLogRecord? entity, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ private async Task<ExportedWorkflowState> CreateExportModelAsync(Request request
var executionLogRecords = request.IncludeWorkflowExecutionLog ? await LoadWorkflowExecutionLogRecordsAsync(workflowState.Id, cancellationToken) : default;
var activityExecutionLogRecords = request.IncludeActivityExecutionLog ? await LoadActivityExecutionLogRecordsAsync(workflowState.Id, cancellationToken) : default;
var bookmarks = request.IncludeBookmarks ? await LoadBookmarksAsync(workflowState.Id, cancellationToken) : null;
var workflowStateElement = await _workflowStateSerializer.SerializeToElementAsync(workflowState, cancellationToken);
var workflowStateElement = _workflowStateSerializer.SerializeToElement(workflowState, cancellationToken);
var bookmarksElement = bookmarks != null ? SerializeBookmarks(bookmarks) : default(JsonElement?);
var executionLogRecordsElement = executionLogRecords != null ? await _safeSerializer.SerializeToElementAsync(executionLogRecords, cancellationToken) : default(JsonElement?);
var activityExecutionLogRecordsElement = activityExecutionLogRecords != null ? await _safeSerializer.SerializeToElementAsync(activityExecutionLogRecords, cancellationToken) : default(JsonElement?);
var executionLogRecordsElement = executionLogRecords != null ? _safeSerializer.SerializeToElement(executionLogRecords, cancellationToken) : default(JsonElement?);
var activityExecutionLogRecordsElement = activityExecutionLogRecords != null ? _safeSerializer.SerializeToElement(activityExecutionLogRecords, cancellationToken) : default(JsonElement?);
var model = new ExportedWorkflowState(workflowStateElement, bookmarksElement, activityExecutionLogRecordsElement, executionLogRecordsElement);
return model;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ private async Task ImportSingleWorkflowInstanceAsync(ExportedWorkflowState model

if (model.ActivityExecutionRecords != null)
{
var activityExecutionRecords = await _safeSerializer.DeserializeAsync<ICollection<ActivityExecutionRecord>>(model.ActivityExecutionRecords.Value, cancellationToken);
var activityExecutionRecords = _safeSerializer.Deserialize<ICollection<ActivityExecutionRecord>>(model.ActivityExecutionRecords.Value, cancellationToken);
await _activityExecutionStore.SaveManyAsync(activityExecutionRecords, cancellationToken);
}

if (model.WorkflowExecutionLogRecords != null)
{
var workflowExecutionLogRecords = await _safeSerializer.DeserializeAsync<ICollection<WorkflowExecutionLogRecord>>(model.WorkflowExecutionLogRecords.Value, cancellationToken);
var workflowExecutionLogRecords = _safeSerializer.Deserialize<ICollection<WorkflowExecutionLogRecord>>(model.WorkflowExecutionLogRecords.Value, cancellationToken);
await _workflowExecutionLogStore.SaveManyAsync(workflowExecutionLogRecords, cancellationToken);
}
}
Expand Down
28 changes: 28 additions & 0 deletions src/modules/Elsa.Workflows.Core/Contracts/ISafeSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,52 @@ public interface ISafeSerializer
/// <summary>
/// Serializes the specified state.
/// </summary>
[Obsolete("Use the non-async Serialize instead.")]
[RequiresUnreferencedCode("The type T may be trimmed.")]
ValueTask<string> SerializeAsync(object? value, CancellationToken cancellationToken = default);

/// <summary>
/// Serializes the specified state to a <see cref="JsonElement"/> object.
/// </summary>
[Obsolete("Use the non-async SerializeToElement instead.")]
[RequiresUnreferencedCode("The type T may be trimmed.")]
ValueTask<JsonElement> SerializeToElementAsync(object? value, CancellationToken cancellationToken = default);

/// <summary>
/// Deserializes the specified state.
/// </summary>
[Obsolete("Use the non-async Deserialize instead.")]
[RequiresUnreferencedCode("The type T may be trimmed.")]
ValueTask<T> DeserializeAsync<T>(string json, CancellationToken cancellationToken = default);

/// <summary>
/// Deserializes the specified state.
/// </summary>
[Obsolete("Use the non-async Deserialize instead.")]
[RequiresUnreferencedCode("The type T may be trimmed.")]
ValueTask<T> DeserializeAsync<T>(JsonElement element, CancellationToken cancellationToken = default);

/// <summary>
/// Serializes the specified state.
/// </summary>
[RequiresUnreferencedCode("The type T may be trimmed.")]
string Serialize(object? value, CancellationToken cancellationToken = default);

/// <summary>
/// Serializes the specified state to a <see cref="JsonElement"/> object.
/// </summary>
[RequiresUnreferencedCode("The type T may be trimmed.")]
JsonElement SerializeToElement(object? value, CancellationToken cancellationToken = default);

/// <summary>
/// Deserializes the specified state.
/// </summary>
[RequiresUnreferencedCode("The type T may be trimmed.")]
T Deserialize<T>(string json, CancellationToken cancellationToken = default);

/// <summary>
/// Deserializes the specified state.
/// </summary>
[RequiresUnreferencedCode("The type T may be trimmed.")]
T Deserialize<T>(JsonElement element, CancellationToken cancellationToken = default);
}
Loading

0 comments on commit 2e1ed99

Please sign in to comment.