From 1aa114189c5e9059926251c4dda35cdf1879167a Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 12:46:22 +0200 Subject: [PATCH 01/10] Removing RulesClass so that it doesn't get registered as a projection / observer --- Source/Clients/DotNET/Rules/RulesProjections.cs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/Source/Clients/DotNET/Rules/RulesProjections.cs b/Source/Clients/DotNET/Rules/RulesProjections.cs index 47f5ec147..327c90a84 100644 --- a/Source/Clients/DotNET/Rules/RulesProjections.cs +++ b/Source/Clients/DotNET/Rules/RulesProjections.cs @@ -18,11 +18,6 @@ namespace Aksio.Cratis.Rules; [Singleton] public class RulesProjections : IRulesProjections { - sealed class RulesClass : RulesFor - { - public override RuleId Identifier => Guid.Empty; - } - readonly IEventTypes _eventTypes; readonly IModelNameConvention _modelNameConvention; readonly IJsonSchemaGenerator _jsonSchemaGenerator; @@ -86,7 +81,7 @@ ProjectionDefinition CreateProjection(IRule rule) var ruleType = typeof(TTarget); - var defineStateMethod = ruleType.GetMethod(nameof(RulesClass.DefineState), BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + var defineStateMethod = ruleType.GetMethod("DefineState", BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); if (defineStateMethod is not null) { var parameters = defineStateMethod.GetParameters(); From 89146bba9250ce2f2b9c15d1c09cb539f16d17cf Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 12:49:39 +0200 Subject: [PATCH 02/10] Adding .ConfigureAwait(false) to MongoDB operations --- .../MongoDB/EventSequences/MongoDBEventSequenceStorage.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Source/Kernel/MongoDB/EventSequences/MongoDBEventSequenceStorage.cs b/Source/Kernel/MongoDB/EventSequences/MongoDBEventSequenceStorage.cs index 75c517afc..243a0fa8e 100644 --- a/Source/Kernel/MongoDB/EventSequences/MongoDBEventSequenceStorage.cs +++ b/Source/Kernel/MongoDB/EventSequences/MongoDBEventSequenceStorage.cs @@ -61,10 +61,10 @@ public MongoDBEventSequenceStorage( } /// - public Task GetCount(EventSequenceId eventSequenceId) + public async Task GetCount(EventSequenceId eventSequenceId) { var collection = GetCollectionFor(eventSequenceId); - return collection.CountDocumentsAsync(FilterDefinition.Empty); + return await collection.CountDocumentsAsync(FilterDefinition.Empty).ConfigureAwait(false); } /// @@ -103,7 +103,7 @@ public async Task Append( }, Array.Empty()); var collection = GetCollectionFor(eventSequenceId); - await collection.InsertOneAsync(@event); + await collection.InsertOneAsync(@event).ConfigureAwait(false); } catch (MongoWriteException writeException) when (writeException.WriteError.Category == ServerErrorCategory.DuplicateKey) { @@ -156,7 +156,7 @@ public async Task Redact( } var updateModel = CreateRedactionUpdateModelFor(@event, reason, causation, causedByChain); - collection.UpdateOne(updateModel.Filter, updateModel.Update); + await collection.UpdateOneAsync(updateModel.Filter, updateModel.Update).ConfigureAwait(false); return @event; } From 9fa12e27cf9b61edf0a8fcef123e8551e3d4a3ea Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 12:50:03 +0200 Subject: [PATCH 03/10] Moving away from concurrent dictionary to regular, should be safe --- .../Engines/Projections/Definitions/ProjectionDefinitions.cs | 5 ++--- .../Projections/Definitions/ProjectionPipelineDefinitions.cs | 5 ++--- Source/Kernel/Engines/Projections/ProjectionManager.cs | 5 ++--- Source/Kernel/Engines/Projections/ProjectionSinks.cs | 3 +-- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/Source/Kernel/Engines/Projections/Definitions/ProjectionDefinitions.cs b/Source/Kernel/Engines/Projections/Definitions/ProjectionDefinitions.cs index a6e662c10..b0248dd2c 100644 --- a/Source/Kernel/Engines/Projections/Definitions/ProjectionDefinitions.cs +++ b/Source/Kernel/Engines/Projections/Definitions/ProjectionDefinitions.cs @@ -1,7 +1,6 @@ // Copyright (c) Aksio Insurtech. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Collections.Concurrent; using Aksio.Cratis.Projections; using Aksio.Cratis.Projections.Definitions; using Aksio.Cratis.Projections.Json; @@ -16,7 +15,7 @@ public class ProjectionDefinitions : IProjectionDefinitions { readonly IProjectionDefinitionsStorage _storage; readonly IJsonProjectionSerializer _projectionSerializer; - readonly ConcurrentDictionary _definitions = new(); + readonly Dictionary _definitions = new(); /// /// Initializes a new instance of the class. @@ -87,7 +86,7 @@ void ThrowIfMissingProjectionDefinition(ProjectionId identifier) async Task PopulateIfEmpty() { - if (_definitions.IsEmpty) + if (_definitions.Count == 0) { await Populate(); } diff --git a/Source/Kernel/Engines/Projections/Definitions/ProjectionPipelineDefinitions.cs b/Source/Kernel/Engines/Projections/Definitions/ProjectionPipelineDefinitions.cs index 9c84095ad..d6156a7e8 100644 --- a/Source/Kernel/Engines/Projections/Definitions/ProjectionPipelineDefinitions.cs +++ b/Source/Kernel/Engines/Projections/Definitions/ProjectionPipelineDefinitions.cs @@ -1,7 +1,6 @@ // Copyright (c) Aksio Insurtech. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Collections.Concurrent; using Aksio.Cratis.Projections; using Aksio.Cratis.Projections.Definitions; @@ -14,7 +13,7 @@ namespace Aksio.Cratis.Kernel.Engines.Projections.Definitions; public class ProjectionPipelineDefinitions : IProjectionPipelineDefinitions { readonly IProjectionPipelineDefinitionsStorage _storage; - readonly ConcurrentDictionary _definitions = new(); + readonly Dictionary _definitions = new(); /// /// Initializes a new instance of the class. @@ -65,7 +64,7 @@ async Task PopulateIfMissing(ProjectionId projectionId) async Task PopulateIfEmpty() { - if (_definitions.IsEmpty) + if (_definitions.Count == 0) { await Populate(); } diff --git a/Source/Kernel/Engines/Projections/ProjectionManager.cs b/Source/Kernel/Engines/Projections/ProjectionManager.cs index 56ca1abf1..f63c01986 100644 --- a/Source/Kernel/Engines/Projections/ProjectionManager.cs +++ b/Source/Kernel/Engines/Projections/ProjectionManager.cs @@ -1,7 +1,6 @@ // Copyright (c) Aksio Insurtech. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Collections.Concurrent; using Aksio.Cratis.Kernel.Engines.Projections.Pipelines; using Aksio.Cratis.Projections; using Aksio.Cratis.Projections.Definitions; @@ -16,8 +15,8 @@ public class ProjectionManager : IProjectionManager { readonly IProjectionFactory _projectionFactory; readonly IProjectionPipelineFactory _projectionPipelineFactory; - readonly ConcurrentDictionary _projections = new(); - readonly ConcurrentDictionary _pipelines = new(); + readonly Dictionary _projections = new(); + readonly Dictionary _pipelines = new(); /// /// Initializes a new instance of the class. diff --git a/Source/Kernel/Engines/Projections/ProjectionSinks.cs b/Source/Kernel/Engines/Projections/ProjectionSinks.cs index 2112907f9..ffe0bed0d 100644 --- a/Source/Kernel/Engines/Projections/ProjectionSinks.cs +++ b/Source/Kernel/Engines/Projections/ProjectionSinks.cs @@ -1,7 +1,6 @@ // Copyright (c) Aksio Insurtech. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Collections.Concurrent; using Aksio.Cratis.Projections; using Aksio.Types; @@ -16,7 +15,7 @@ public class ProjectionSinks : IProjectionSinks sealed record Key(ProjectionSinkTypeId TypeId, ProjectionId ProjectionId); readonly IDictionary _factories; - readonly ConcurrentDictionary _stores = new(); + readonly Dictionary _stores = new(); /// /// Initializes a new instance of the class. From 33d2cce3f67bbbda873dbef681ec9ff5f02e60ec Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 12:50:25 +0200 Subject: [PATCH 04/10] Fixing XML comments --- Source/Clients/DotNET/EventSequences/IEventSequence.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Clients/DotNET/EventSequences/IEventSequence.cs b/Source/Clients/DotNET/EventSequences/IEventSequence.cs index dd76f2590..e576bf6d6 100644 --- a/Source/Clients/DotNET/EventSequences/IEventSequence.cs +++ b/Source/Clients/DotNET/EventSequences/IEventSequence.cs @@ -42,7 +42,7 @@ public interface IEventSequence Task Append(EventSourceId eventSourceId, object @event, DateTimeOffset? validFrom = default); /// - /// Append a single event to the event store. + /// Append a collection of events to the event store. /// /// The to append for. /// Collection of events to append. @@ -50,7 +50,7 @@ public interface IEventSequence Task AppendMany(EventSourceId eventSourceId, IEnumerable events); /// - /// Append a single event to the event store. + /// Append a collection of events to the event store with valid from per event. /// /// The to append for. /// Collection of events with valid from to append. From 942e4d08a4ce7e0ae91b8b7164c8eb93bc29f9b5 Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 12:51:17 +0200 Subject: [PATCH 05/10] Priming event sequence caches at startup, rather than lazily This is to avoid deadlocks during hook up of observers --- .../Streaming/EventSequenceCache.cs | 21 +++++++-------- .../Streaming/EventSequenceCaches.cs | 27 ++++++++++++++++--- .../Streaming/EventSequenceQueueAdapter.cs | 3 +-- .../EventSequenceQueueAdapterReceiver.cs | 5 ++-- .../Streaming/IEventSequenceCache.cs | 6 +++++ .../Streaming/IEventSequenceCaches.cs | 6 +++++ Source/Kernel/Server/BootProcedure.cs | 15 ++++++++++- .../Kernel/Server/BootProcedureLogMessages.cs | 19 +++++++++++++ .../given/an_event_sequence_caches.cs | 5 +++- 9 files changed, 86 insertions(+), 21 deletions(-) create mode 100644 Source/Kernel/Server/BootProcedureLogMessages.cs diff --git a/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceCache.cs b/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceCache.cs index 1cdbd826b..ae3c73b81 100644 --- a/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceCache.cs +++ b/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceCache.cs @@ -80,8 +80,6 @@ public EventSequenceCache( _executionContextManager = executionContextManager; _eventSequenceStorageProvider = eventSequenceStorageProvider; _logger = logger; - - PreFillWithTailWindow(); } /// @@ -155,6 +153,16 @@ public void Purge() } } + /// + public async Task PrimeWithTailWindow() + { + _executionContextManager.Establish(_tenantId, CorrelationId.New(), _microserviceId); + var tail = await _eventSequenceStorageProvider().GetTailSequenceNumber(_eventSequenceId); + tail -= NumberOfEventsToFetch; + if ((long)tail.Value < 0) tail = 0; + Prime(tail); + } + void AddImplementation(AppendedEvent @event) { if (_eventsBySequenceNumber.ContainsKey(@event.Metadata.SequenceNumber)) @@ -192,13 +200,4 @@ void RelievePressure() } } } - - void PreFillWithTailWindow() - { - _executionContextManager.Establish(_tenantId, CorrelationId.New(), _microserviceId); - var tail = _eventSequenceStorageProvider().GetTailSequenceNumber(_eventSequenceId).GetAwaiter().GetResult(); - tail -= NumberOfEventsToFetch; - if ((long)tail.Value < 0) tail = 0; - Prime(tail); - } } diff --git a/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceCaches.cs b/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceCaches.cs index be2a4fa41..94b5e87f4 100644 --- a/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceCaches.cs +++ b/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceCaches.cs @@ -1,8 +1,8 @@ // Copyright (c) Aksio Insurtech. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Collections.Concurrent; using Aksio.Cratis.EventSequences; +using Aksio.Cratis.Kernel.Configuration; namespace Aksio.Cratis.Kernel.Grains.EventSequences.Streaming; @@ -12,16 +12,21 @@ namespace Aksio.Cratis.Kernel.Grains.EventSequences.Streaming; [Singleton] public class EventSequenceCaches : IEventSequenceCaches { - readonly ConcurrentDictionary<(MicroserviceId, TenantId, EventSequenceId), IEventSequenceCache> _caches = new(); + readonly Dictionary<(MicroserviceId, TenantId, EventSequenceId), IEventSequenceCache> _caches = new(); readonly IEventSequenceCacheFactory _eventSequenceCacheFactory; + readonly KernelConfiguration _configuration; /// /// Initializes a new instance of the class. /// /// for creating instances. - public EventSequenceCaches(IEventSequenceCacheFactory eventSequenceCacheFactory) + /// The ./// + public EventSequenceCaches( + IEventSequenceCacheFactory eventSequenceCacheFactory, + KernelConfiguration configuration) { _eventSequenceCacheFactory = eventSequenceCacheFactory; + _configuration = configuration; } /// @@ -40,6 +45,22 @@ public IEventSequenceCache GetFor(MicroserviceId microserviceId, TenantId tenant /// public bool IsUnderPressure() => _caches.Values.Any(_ => _.IsUnderPressure()); + /// + public Task PrimeAll() + { + var tasks = new List(); + + foreach (var (microserviceId, microservice) in _configuration.Microservices) + { + foreach (var (tenantId, _) in _configuration.Tenants) + { + tasks.Add(GetFor(microserviceId, tenantId, EventSequenceId.Log).PrimeWithTailWindow()); + } + } + + return Task.WhenAll(tasks.ToArray()); + } + /// public void Purge() { diff --git a/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceQueueAdapter.cs b/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceQueueAdapter.cs index 6c0c308c5..9da085414 100644 --- a/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceQueueAdapter.cs +++ b/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceQueueAdapter.cs @@ -1,7 +1,6 @@ // Copyright (c) Aksio Insurtech. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Collections.Concurrent; using Aksio.Cratis.Events; using Aksio.Cratis.EventSequences; using Aksio.Cratis.Identities; @@ -17,7 +16,7 @@ namespace Aksio.Cratis.Kernel.Grains.EventSequences.Streaming; /// public class EventSequenceQueueAdapter : IQueueAdapter { - readonly ConcurrentDictionary _receivers = new(); + readonly Dictionary _receivers = new(); readonly IStreamQueueMapper _mapper; readonly ProviderFor _eventSequenceStorageProvider; diff --git a/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceQueueAdapterReceiver.cs b/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceQueueAdapterReceiver.cs index 0da4f1e60..01560940c 100644 --- a/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceQueueAdapterReceiver.cs +++ b/Source/Kernel/Grains/EventSequences/Streaming/EventSequenceQueueAdapterReceiver.cs @@ -1,7 +1,6 @@ // Copyright (c) Aksio Insurtech. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Collections.Concurrent; using Aksio.Cratis.Events; using Orleans.Runtime; using Orleans.Streams; @@ -13,13 +12,13 @@ namespace Aksio.Cratis.Kernel.Grains.EventSequences.Streaming; /// public class EventSequenceQueueAdapterReceiver : IQueueAdapterReceiver { - readonly ConcurrentBag _eventBatches = new(); + readonly List _eventBatches = new(); readonly List _empty = new(); /// public Task> GetQueueMessagesAsync(int maxCount) { - if (!_eventBatches.IsEmpty) + if (_eventBatches.Count != 0) { var result = _eventBatches.OrderBy(_ => _.SequenceToken).ToArray().ToList(); _eventBatches.Clear(); diff --git a/Source/Kernel/Grains/EventSequences/Streaming/IEventSequenceCache.cs b/Source/Kernel/Grains/EventSequences/Streaming/IEventSequenceCache.cs index a9a79bcb5..23ced4fd7 100644 --- a/Source/Kernel/Grains/EventSequences/Streaming/IEventSequenceCache.cs +++ b/Source/Kernel/Grains/EventSequences/Streaming/IEventSequenceCache.cs @@ -51,6 +51,12 @@ public interface IEventSequenceCache : IDisposable /// The sequence number to populate from. void Prime(EventSequenceNumber from); + /// + /// Prime the cache with the tail window. + /// + /// Awaitable task. + Task PrimeWithTailWindow(); + /// /// Check if the cache is under pressure. /// diff --git a/Source/Kernel/Grains/EventSequences/Streaming/IEventSequenceCaches.cs b/Source/Kernel/Grains/EventSequences/Streaming/IEventSequenceCaches.cs index ad2db9b0e..a7b69651b 100644 --- a/Source/Kernel/Grains/EventSequences/Streaming/IEventSequenceCaches.cs +++ b/Source/Kernel/Grains/EventSequences/Streaming/IEventSequenceCaches.cs @@ -19,6 +19,12 @@ public interface IEventSequenceCaches /// The associated. IEventSequenceCache GetFor(MicroserviceId microserviceId, TenantId tenantId, EventSequenceId eventSequenceId); + /// + /// Prime all caches. + /// + /// Awaitable task. + Task PrimeAll(); + /// /// Check if any of the caches are under pressure. /// diff --git a/Source/Kernel/Server/BootProcedure.cs b/Source/Kernel/Server/BootProcedure.cs index 9f33f7de0..6e6ebdc1a 100644 --- a/Source/Kernel/Server/BootProcedure.cs +++ b/Source/Kernel/Server/BootProcedure.cs @@ -6,6 +6,7 @@ using Aksio.Cratis.Identities; using Aksio.Cratis.Kernel.Configuration; using Aksio.Cratis.Kernel.Grains.EventSequences.Inbox; +using Aksio.Cratis.Kernel.Grains.EventSequences.Streaming; using Aksio.Cratis.Kernel.Grains.Projections; namespace Aksio.Cratis.Kernel.Server; @@ -19,6 +20,7 @@ public class BootProcedure : IPerformBootProcedure readonly IExecutionContextManager _executionContextManager; readonly IGrainFactory _grainFactory; readonly KernelConfiguration _configuration; + readonly ILogger _logger; /// /// Initializes a new instance of the class. @@ -27,16 +29,19 @@ public class BootProcedure : IPerformBootProcedure /// for working with the execution context. /// for getting grains. /// The . + /// Logger for logging. public BootProcedure( IServiceProvider serviceProvider, IExecutionContextManager executionContextManager, IGrainFactory grainFactory, - KernelConfiguration configuration) + KernelConfiguration configuration, + ILogger logger) { _serviceProvider = serviceProvider; _executionContextManager = executionContextManager; _grainFactory = grainFactory; _configuration = configuration; + _logger = logger; } /// @@ -44,15 +49,23 @@ public void Perform() { _ = Task.Run(() => { + _logger.PrimingEventSequenceCaches(); + var eventSequenceCaches = _serviceProvider.GetRequiredService()!; + eventSequenceCaches.PrimeAll().Wait(); + foreach (var (microserviceId, microservice) in _configuration.Microservices) { _executionContextManager.Establish(microserviceId); + + this._logger.PopulateSchemaStore(); var schemaStore = _serviceProvider.GetRequiredService()!; schemaStore.Populate().Wait(); + this._logger.PopulateIdentityStore(); var identityStore = _serviceProvider.GetRequiredService()!; identityStore.Populate().Wait(); + _logger.RehydrateProjections(); var projections = _grainFactory.GetGrain(0); projections.Rehydrate().Wait(); diff --git a/Source/Kernel/Server/BootProcedureLogMessages.cs b/Source/Kernel/Server/BootProcedureLogMessages.cs new file mode 100644 index 000000000..675bee930 --- /dev/null +++ b/Source/Kernel/Server/BootProcedureLogMessages.cs @@ -0,0 +1,19 @@ +// Copyright (c) Aksio Insurtech. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Aksio.Cratis.Kernel.Server; + +internal static partial class BootProcedureLogMessages +{ + [LoggerMessage(0, LogLevel.Information, "Priming caches for all event sequences for all microservices and tenants")] + internal static partial void PrimingEventSequenceCaches(this ILogger logger); + + [LoggerMessage(1, LogLevel.Information, "Populating schema store")] + internal static partial void PopulateSchemaStore(this ILogger logger); + + [LoggerMessage(2, LogLevel.Information, "Populating identity store")] + internal static partial void PopulateIdentityStore(this ILogger logger); + + [LoggerMessage(3, LogLevel.Information, "Rehydrating projections")] + internal static partial void RehydrateProjections(this ILogger logger); +} diff --git a/Specifications/Kernel/Grains/EventSequences/Streaming/for_EventSequenceCaches/given/an_event_sequence_caches.cs b/Specifications/Kernel/Grains/EventSequences/Streaming/for_EventSequenceCaches/given/an_event_sequence_caches.cs index 01f798d11..a985afeda 100644 --- a/Specifications/Kernel/Grains/EventSequences/Streaming/for_EventSequenceCaches/given/an_event_sequence_caches.cs +++ b/Specifications/Kernel/Grains/EventSequences/Streaming/for_EventSequenceCaches/given/an_event_sequence_caches.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using Aksio.Cratis.EventSequences; +using Aksio.Cratis.Kernel.Configuration; namespace Aksio.Cratis.Kernel.Grains.EventSequences.Streaming.for_EventSequenceCaches.given; @@ -10,6 +11,7 @@ public class an_event_sequence_caches : Specification protected Mock execution_context_manager; protected Mock event_sequence_storage_provider; protected Mock event_sequence_cache_factory; + protected KernelConfiguration configuration; protected EventSequenceCaches caches; void Establish() @@ -17,6 +19,7 @@ void Establish() execution_context_manager = new(); event_sequence_storage_provider = new(); event_sequence_cache_factory = new(); - caches = new(event_sequence_cache_factory.Object); + configuration = new(); + caches = new(event_sequence_cache_factory.Object, configuration); } } From 3f11d97ef704a4c0cea2f2204ff59963bdfa55eb Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 12:56:21 +0200 Subject: [PATCH 06/10] Moving responsibility of deciding when an even operation occurred to higher level --- ...equenceStorageProviderForSpecifications.cs | 8 ++++---- .../Grains/EventSequences/EventSequence.cs | 6 ++++-- .../Streaming/EventSequenceQueueAdapter.cs | 1 + .../MongoDBEventSequenceStorage.cs | 19 ++++++++++++------- .../EventSequences/IEventSequenceStorage.cs | 14 +++++++++----- 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/Source/Clients/Specifications/EventSequenceStorageProviderForSpecifications.cs b/Source/Clients/Specifications/EventSequenceStorageProviderForSpecifications.cs index 75d013c6a..7e871ef9e 100644 --- a/Source/Clients/Specifications/EventSequenceStorageProviderForSpecifications.cs +++ b/Source/Clients/Specifications/EventSequenceStorageProviderForSpecifications.cs @@ -104,16 +104,16 @@ public Task HasInstanceFor(EventSequenceId eventSequenceId, EventTypeId ev } /// - public Task Append(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventSourceId eventSourceId, EventType eventType, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset validFrom, ExpandoObject content) => throw new NotImplementedException(); + public Task Append(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventSourceId eventSourceId, EventType eventType, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset validFrom, DateTimeOffset occurred, ExpandoObject content) => throw new NotImplementedException(); /// - public Task Compensate(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventType eventType, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset validFrom, ExpandoObject content) => throw new NotImplementedException(); + public Task Compensate(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventType eventType, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset validFrom, DateTimeOffset occurred, ExpandoObject content) => throw new NotImplementedException(); /// - public Task Redact(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, RedactionReason reason, IEnumerable causation, IEnumerable causedByChain) => throw new NotImplementedException(); + public Task Redact(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, RedactionReason reason, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset occurred) => throw new NotImplementedException(); /// - public Task> Redact(EventSequenceId eventSequenceId, EventSourceId eventSourceId, RedactionReason reason, IEnumerable? eventTypes, IEnumerable causation, IEnumerable causedByChain) => throw new NotImplementedException(); + public Task> Redact(EventSequenceId eventSequenceId, EventSourceId eventSourceId, RedactionReason reason, IEnumerable? eventTypes, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset occurred) => throw new NotImplementedException(); /// public Task GetEventAt(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber) => throw new NotImplementedException(); diff --git a/Source/Kernel/Grains/EventSequences/EventSequence.cs b/Source/Kernel/Grains/EventSequences/EventSequence.cs index dd283ed34..d5e63d76c 100644 --- a/Source/Kernel/Grains/EventSequences/EventSequence.cs +++ b/Source/Kernel/Grains/EventSequences/EventSequence.cs @@ -247,7 +247,8 @@ public async Task causation, IEnumerable causedByChain, + DateTimeOffset occurred, DateTimeOffset validFrom, ExpandoObject content) { @@ -94,7 +95,7 @@ public async Task Append( causation, causedByChain, eventType.Id, - DateTimeOffset.UtcNow, + occurred, validFrom, eventSourceId, new Dictionary @@ -134,6 +135,7 @@ public Task Compensate( EventType eventType, IEnumerable causation, IEnumerable causedByChain, + DateTimeOffset occurred, DateTimeOffset validFrom, ExpandoObject content) => throw new NotImplementedException(); @@ -143,7 +145,8 @@ public async Task Redact( EventSequenceNumber sequenceNumber, RedactionReason reason, IEnumerable causation, - IEnumerable causedByChain) + IEnumerable causedByChain, + DateTimeOffset occurred) { _logger.Redacting(eventSequenceId, sequenceNumber); var collection = GetCollectionFor(eventSequenceId); @@ -155,7 +158,7 @@ public async Task Redact( return @event; } - var updateModel = CreateRedactionUpdateModelFor(@event, reason, causation, causedByChain); + var updateModel = CreateRedactionUpdateModelFor(@event, reason, causation, causedByChain, occurred); await collection.UpdateOneAsync(updateModel.Filter, updateModel.Update).ConfigureAwait(false); return @event; @@ -168,7 +171,8 @@ public async Task> Redact( RedactionReason reason, IEnumerable? eventTypes, IEnumerable causation, - IEnumerable causedByChain) + IEnumerable causedByChain, + DateTimeOffset occurred) { _logger.RedactingMultiple(eventSequenceId, eventSourceId, eventTypes ?? Enumerable.Empty()); var collection = GetCollectionFor(eventSequenceId); @@ -186,7 +190,7 @@ public async Task> Redact( continue; } - updates.Add(CreateRedactionUpdateModelFor(@event, reason, causation, causedByChain)); + updates.Add(CreateRedactionUpdateModelFor(@event, reason, causation, causedByChain, occurred)); affectedEventTypes.Add(@event.Metadata.Type); } @@ -407,13 +411,14 @@ UpdateOneModel CreateRedactionUpdateModelFor( AppendedEvent @event, RedactionReason reason, IEnumerable causation, - IEnumerable causedById) + IEnumerable causedById, + DateTimeOffset occurred) { var executionContext = _executionContextManager.Current; var content = new RedactionEventContent( reason, @event.Metadata.Type.Id, - DateTimeOffset.UtcNow, + occurred, executionContext.CorrelationId, causation, causedById); diff --git a/Source/Kernel/Shared/EventSequences/IEventSequenceStorage.cs b/Source/Kernel/Shared/EventSequences/IEventSequenceStorage.cs index 3ac72ead0..febcc316c 100644 --- a/Source/Kernel/Shared/EventSequences/IEventSequenceStorage.cs +++ b/Source/Kernel/Shared/EventSequences/IEventSequenceStorage.cs @@ -29,10 +29,11 @@ public interface IEventSequenceStorage /// The type of event to append. /// Collection of . /// The chain of representing the person, system or service that caused the event. - /// Optional date and time for when the compensation is valid from. + /// The date and time the event occurred. + /// Date and time for when the compensation is valid from. /// The content of the event. /// Awaitable . - Task Append(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventSourceId eventSourceId, EventType eventType, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset validFrom, ExpandoObject content); + Task Append(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventSourceId eventSourceId, EventType eventType, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset occurred, DateTimeOffset validFrom, ExpandoObject content); /// /// Compensate a single event to the event store. @@ -42,10 +43,11 @@ public interface IEventSequenceStorage /// The type of event to append. /// Collection of . /// The chain of representing the person, system or service that caused the event. + /// The date and time the compensation occurred. /// Optional date and time for when the compensation is valid from. /// The content of the event. /// Awaitable . - Task Compensate(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventType eventType, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset validFrom, ExpandoObject content); + Task Compensate(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventType eventType, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset occurred, DateTimeOffset validFrom, ExpandoObject content); /// /// Redact an event at a specific sequence number. @@ -55,8 +57,9 @@ public interface IEventSequenceStorage /// Reason for redacting. /// Collection of . /// The chain of representing the person, system or service that caused the event. + /// The date and time the redaction occurred. /// Affected event. - Task Redact(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, RedactionReason reason, IEnumerable causation, IEnumerable causedByChain); + Task Redact(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, RedactionReason reason, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset occurred); /// /// Redact all events for a specific . @@ -67,8 +70,9 @@ public interface IEventSequenceStorage /// Optionally any specific event types. /// Collection of . /// The chain of representing the person, system or service that caused the event. + /// The date and time the redaction occurred. /// Affected event types. - Task> Redact(EventSequenceId eventSequenceId, EventSourceId eventSourceId, RedactionReason reason, IEnumerable? eventTypes, IEnumerable causation, IEnumerable causedByChain); + Task> Redact(EventSequenceId eventSequenceId, EventSourceId eventSourceId, RedactionReason reason, IEnumerable? eventTypes, IEnumerable causation, IEnumerable causedByChain, DateTimeOffset occurred); /// /// Get the sequence number of the first event as part of the filtered event types. From f7285fd0790d16f8ce5e5d394b8db85844e08fe5 Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 15:15:45 +0200 Subject: [PATCH 07/10] Setting subscription type on state when setting current subscription --- .../Grains/Observation/ObserverWorker.cs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/Source/Kernel/Grains/Observation/ObserverWorker.cs b/Source/Kernel/Grains/Observation/ObserverWorker.cs index b22890c97..3cb764f4d 100644 --- a/Source/Kernel/Grains/Observation/ObserverWorker.cs +++ b/Source/Kernel/Grains/Observation/ObserverWorker.cs @@ -19,12 +19,22 @@ public abstract class ObserverWorker : Grain readonly ProviderFor _eventSequenceStorageProviderProvider; readonly IExecutionContextManager _executionContextManager; readonly IPersistentState _observerState; + ObserverSubscription _currentSubscription = ObserverSubscription.Unsubscribed; IObserverSupervisor? _supervisor; /// /// Gets or sets the subscriber type. /// - protected ObserverSubscription CurrentSubscription { get; set; } = ObserverSubscription.Unsubscribed; + protected ObserverSubscription CurrentSubscription + { + get => _currentSubscription; + set + { + _currentSubscription = value; + State.CurrentSubscriptionType = _currentSubscription.SubscriberType.AssemblyQualifiedName; + State.CurrentSubscriptionArguments = _currentSubscription.Arguments; + } + } /// /// Gets the . @@ -250,10 +260,5 @@ protected async Task ReadStateAsync() /// Write the observer state. /// /// Awaitable task. - protected Task WriteStateAsync() - { - State.CurrentSubscriptionType = CurrentSubscription.SubscriberType.AssemblyQualifiedName; - State.CurrentSubscriptionArguments = CurrentSubscription.Arguments; - return _observerState.WriteStateAsync(); - } + protected Task WriteStateAsync() => _observerState.WriteStateAsync(); } From 6a90c226f2479814051fc090609c445b76c2d612 Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 15:16:30 +0200 Subject: [PATCH 08/10] Using current subscription type + args when reading, since these aren't persisted --- Source/Kernel/MongoDB/Observation/ObserverStorageProvider.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Source/Kernel/MongoDB/Observation/ObserverStorageProvider.cs b/Source/Kernel/MongoDB/Observation/ObserverStorageProvider.cs index b65b9f3f7..2a01c3b38 100644 --- a/Source/Kernel/MongoDB/Observation/ObserverStorageProvider.cs +++ b/Source/Kernel/MongoDB/Observation/ObserverStorageProvider.cs @@ -80,6 +80,8 @@ public async Task ReadStateAsync(string stateName, GrainId grainId, IGrainSta RunningState = ObserverRunningState.New }; state.FailedPartitions = failedPartitions; + state.CurrentSubscriptionType = actualGrainState.State?.CurrentSubscriptionType; + state.CurrentSubscriptionArguments = actualGrainState.State?.CurrentSubscriptionArguments; actualGrainState.State = state; } From 6300bd28c464782dfd87dac76a043c56a54af85a Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 15:16:50 +0200 Subject: [PATCH 09/10] Make sure to set subscription before reading state - since reading makes decisions based on subscription --- Source/Kernel/Grains/Observation/CatchUp.cs | 3 +-- Source/Kernel/Grains/Observation/Replay.cs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Source/Kernel/Grains/Observation/CatchUp.cs b/Source/Kernel/Grains/Observation/CatchUp.cs index b4762215c..a1b95f7ad 100644 --- a/Source/Kernel/Grains/Observation/CatchUp.cs +++ b/Source/Kernel/Grains/Observation/CatchUp.cs @@ -70,10 +70,9 @@ public async Task Start(ObserverSubscription subscription) return; } - await ReadStateAsync(); - _logger.Starting(ObserverId, MicroserviceId, TenantId, EventSequenceId, SourceMicroserviceId, SourceTenantId); CurrentSubscription = subscription; + await ReadStateAsync(); _isRunning = true; _timer = RegisterTimer(PerformCatchUp, null, TimeSpan.Zero, TimeSpan.FromHours(1)); } diff --git a/Source/Kernel/Grains/Observation/Replay.cs b/Source/Kernel/Grains/Observation/Replay.cs index 8a43d7772..0f3742d48 100644 --- a/Source/Kernel/Grains/Observation/Replay.cs +++ b/Source/Kernel/Grains/Observation/Replay.cs @@ -70,10 +70,9 @@ public async Task Start(ObserverSubscription subscription) return; } - await ReadStateAsync(); - _logger.Starting(ObserverId, MicroserviceId, TenantId, EventSequenceId, SourceMicroserviceId, SourceTenantId); CurrentSubscription = subscription; + await ReadStateAsync(); _isRunning = true; _timer = RegisterTimer(PerformReplay, null, TimeSpan.Zero, TimeSpan.FromHours(1)); } From be099d8ca2537d907fea634266bd1553f7358314 Mon Sep 17 00:00:00 2001 From: Einar Ingebrigtsen Date: Fri, 25 Aug 2023 15:28:32 +0200 Subject: [PATCH 10/10] Fixing redaction to work from Workbench --- .../Domain/EventSequences/EventSequence.cs | 26 +++++++++++++++---- .../Domain/EventSequences/RedactEvent.cs | 4 +-- .../Domain/EventSequences/RedactEvents.cs | 4 +-- .../API/events/store/sequence/RedactEvent.ts | 8 +++--- .../API/events/store/sequence/RedactEvents.ts | 8 +++--- 5 files changed, 33 insertions(+), 17 deletions(-) diff --git a/Source/Kernel/Domain/EventSequences/EventSequence.cs b/Source/Kernel/Domain/EventSequences/EventSequence.cs index 464c4a89c..1390aa596 100644 --- a/Source/Kernel/Domain/EventSequences/EventSequence.cs +++ b/Source/Kernel/Domain/EventSequences/EventSequence.cs @@ -1,8 +1,10 @@ // Copyright (c) Aksio Insurtech. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Aksio.Cratis.Auditing; using Aksio.Cratis.Events; using Aksio.Cratis.EventSequences; +using Aksio.Cratis.Identities; using Aksio.Cratis.Kernel.Grains.Workers; using Microsoft.AspNetCore.Mvc; using IEventSequence = Aksio.Cratis.Kernel.Grains.EventSequences.IEventSequence; @@ -19,18 +21,26 @@ public class EventSequence : Controller { readonly IGrainFactory _grainFactory; readonly IExecutionContextManager _executionContextManager; + readonly ICausationManager _causationManager; + readonly IIdentityProvider _identityProvider; /// /// Initializes a new instance of the class. /// /// . /// . + /// The for working with causation. + /// The for getting current identity. public EventSequence( IGrainFactory grainFactory, - IExecutionContextManager executionContextManager) + IExecutionContextManager executionContextManager, + ICausationManager causationManager, + IIdentityProvider identityProvider) { _grainFactory = grainFactory; _executionContextManager = executionContextManager; + _causationManager = causationManager; + _identityProvider = identityProvider; } /// @@ -95,13 +105,16 @@ public async Task RedactEvent( [FromRoute] TenantId tenantId, [FromBody] RedactEvent redaction) { + var causation = redaction.Causation ?? _causationManager.GetCurrentChain(); + var causedBy = redaction.CausedBy ?? _identityProvider.GetCurrent(); + _executionContextManager.Establish(tenantId, _executionContextManager.Current.CorrelationId, microserviceId); var eventSequence = GetEventSequence(microserviceId, eventSequenceId, tenantId); var worker = await eventSequence.Redact( redaction.SequenceNumber, redaction.Reason, - redaction.Causation, - redaction.CausedBy); + causation, + causedBy); await worker.WaitForResult(); } @@ -120,14 +133,17 @@ public async Task RedactEvents( [FromRoute] TenantId tenantId, [FromBody] RedactEvents redaction) { + var causation = redaction.Causation ?? _causationManager.GetCurrentChain(); + var causedBy = redaction.CausedBy ?? _identityProvider.GetCurrent(); + _executionContextManager.Establish(tenantId, _executionContextManager.Current.CorrelationId, microserviceId); var eventSequence = GetEventSequence(microserviceId, eventSequenceId, tenantId); var worker = await eventSequence.Redact( redaction.EventSourceId, redaction.Reason, redaction.EventTypes.Select(_ => new EventType(_, EventGeneration.Unspecified)).ToArray(), - redaction.Causation, - redaction.CausedBy); + causation, + causedBy); await worker.WaitForResult(); } diff --git a/Source/Kernel/Domain/EventSequences/RedactEvent.cs b/Source/Kernel/Domain/EventSequences/RedactEvent.cs index 87d5ca83a..05901670f 100644 --- a/Source/Kernel/Domain/EventSequences/RedactEvent.cs +++ b/Source/Kernel/Domain/EventSequences/RedactEvent.cs @@ -17,5 +17,5 @@ namespace Aksio.Cratis.Kernel.Domain.EventSequences; public record RedactEvent( EventSequenceNumber SequenceNumber, RedactionReason Reason, - IEnumerable Causation, - Identity CausedBy); + IEnumerable? Causation, + Identity? CausedBy); diff --git a/Source/Kernel/Domain/EventSequences/RedactEvents.cs b/Source/Kernel/Domain/EventSequences/RedactEvents.cs index 7f896dbc5..4a43410ec 100644 --- a/Source/Kernel/Domain/EventSequences/RedactEvents.cs +++ b/Source/Kernel/Domain/EventSequences/RedactEvents.cs @@ -19,5 +19,5 @@ public record RedactEvents( EventSourceId EventSourceId, RedactionReason Reason, IEnumerable EventTypes, - IEnumerable Causation, - Identity CausedBy); + IEnumerable? Causation, + Identity? CausedBy); diff --git a/Source/Workbench/API/events/store/sequence/RedactEvent.ts b/Source/Workbench/API/events/store/sequence/RedactEvent.ts index 32d1f40d5..7e22a370a 100644 --- a/Source/Workbench/API/events/store/sequence/RedactEvent.ts +++ b/Source/Workbench/API/events/store/sequence/RedactEvent.ts @@ -16,7 +16,7 @@ export interface IRedactEvent { tenantId?: string; sequenceNumber?: number; reason?: string; - causation?: Causation[]; + causation?: Causation; causedBy?: Identity; } @@ -42,7 +42,7 @@ export class RedactEvent extends Command implements IRedactEvent { private _tenantId!: string; private _sequenceNumber!: number; private _reason!: string; - private _causation!: Causation[]; + private _causation!: Causation; private _causedBy!: Identity; constructor() { @@ -109,11 +109,11 @@ export class RedactEvent extends Command implements IRedactEvent { this._reason = value; this.propertyChanged('reason'); } - get causation(): Causation[] { + get causation(): Causation { return this._causation; } - set causation(value: Causation[]) { + set causation(value: Causation) { this._causation = value; this.propertyChanged('causation'); } diff --git a/Source/Workbench/API/events/store/sequence/RedactEvents.ts b/Source/Workbench/API/events/store/sequence/RedactEvents.ts index 73bfa59be..19ed418e5 100644 --- a/Source/Workbench/API/events/store/sequence/RedactEvents.ts +++ b/Source/Workbench/API/events/store/sequence/RedactEvents.ts @@ -17,7 +17,7 @@ export interface IRedactEvents { eventSourceId?: string; reason?: string; eventTypes?: string[]; - causation?: Causation[]; + causation?: Causation; causedBy?: Identity; } @@ -45,7 +45,7 @@ export class RedactEvents extends Command implements IRedactEvent private _eventSourceId!: string; private _reason!: string; private _eventTypes!: string[]; - private _causation!: Causation[]; + private _causation!: Causation; private _causedBy!: Identity; constructor() { @@ -121,11 +121,11 @@ export class RedactEvents extends Command implements IRedactEvent this._eventTypes = value; this.propertyChanged('eventTypes'); } - get causation(): Causation[] { + get causation(): Causation { return this._causation; } - set causation(value: Causation[]) { + set causation(value: Causation) { this._causation = value; this.propertyChanged('causation'); }