Skip to content

Commit

Permalink
Merge pull request #935 from aksio-insurtech:fix/regressions
Browse files Browse the repository at this point in the history
Fix/regressions
  • Loading branch information
einari authored Aug 25, 2023
2 parents 8596971 + be099d8 commit 9dd116e
Show file tree
Hide file tree
Showing 28 changed files with 179 additions and 90 deletions.
4 changes: 2 additions & 2 deletions Source/Clients/DotNET/EventSequences/IEventSequence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public interface IEventSequence
Task Append(EventSourceId eventSourceId, object @event, DateTimeOffset? validFrom = default);

/// <summary>
/// Append a single event to the event store.
/// Append a collection of events to the event store.
/// </summary>
/// <param name="eventSourceId">The <see cref="EventSourceId"/> to append for.</param>
/// <param name="events">Collection of events to append.</param>
/// <returns>Awaitable <see cref="Task"/>.</returns>
Task AppendMany(EventSourceId eventSourceId, IEnumerable<object> events);

/// <summary>
/// Append a single event to the event store.
/// Append a collection of events to the event store with valid from per event.
/// </summary>
/// <param name="eventSourceId">The <see cref="EventSourceId"/> to append for.</param>
/// <param name="events">Collection of events with valid from to append.</param>
Expand Down
7 changes: 1 addition & 6 deletions Source/Clients/DotNET/Rules/RulesProjections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ namespace Aksio.Cratis.Rules;
[Singleton]
public class RulesProjections : IRulesProjections
{
sealed class RulesClass : RulesFor<RulesClass, object>
{
public override RuleId Identifier => Guid.Empty;
}

readonly IEventTypes _eventTypes;
readonly IModelNameConvention _modelNameConvention;
readonly IJsonSchemaGenerator _jsonSchemaGenerator;
Expand Down Expand Up @@ -86,7 +81,7 @@ ProjectionDefinition CreateProjection<TTarget>(IRule rule)

var ruleType = typeof(TTarget);

var defineStateMethod = ruleType.GetMethod(nameof(RulesClass.DefineState), BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
var defineStateMethod = ruleType.GetMethod("DefineState", BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
if (defineStateMethod is not null)
{
var parameters = defineStateMethod.GetParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ public Task<bool> HasInstanceFor(EventSequenceId eventSequenceId, EventTypeId ev
}

/// <inheritdoc/>
public Task Append(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventSourceId eventSourceId, EventType eventType, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset validFrom, ExpandoObject content) => throw new NotImplementedException();
public Task Append(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventSourceId eventSourceId, EventType eventType, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset validFrom, DateTimeOffset occurred, ExpandoObject content) => throw new NotImplementedException();

Check failure on line 107 in Source/Clients/Specifications/EventSequenceStorageProviderForSpecifications.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Parameter name 'validFrom' differs from base name 'occurred'. (http://pihrt.net/roslynator/analyzer?id=RCS1168)

Check failure on line 107 in Source/Clients/Specifications/EventSequenceStorageProviderForSpecifications.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Parameter name 'occurred' differs from base name 'validFrom'. (http://pihrt.net/roslynator/analyzer?id=RCS1168)

/// <inheritdoc/>
public Task Compensate(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventType eventType, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset validFrom, ExpandoObject content) => throw new NotImplementedException();
public Task Compensate(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventType eventType, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset validFrom, DateTimeOffset occurred, ExpandoObject content) => throw new NotImplementedException();

Check failure on line 110 in Source/Clients/Specifications/EventSequenceStorageProviderForSpecifications.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Parameter name 'validFrom' differs from base name 'occurred'. (http://pihrt.net/roslynator/analyzer?id=RCS1168)

Check failure on line 110 in Source/Clients/Specifications/EventSequenceStorageProviderForSpecifications.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Parameter name 'occurred' differs from base name 'validFrom'. (http://pihrt.net/roslynator/analyzer?id=RCS1168)

/// <inheritdoc/>
public Task<AppendedEvent> Redact(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, RedactionReason reason, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain) => throw new NotImplementedException();
public Task<AppendedEvent> Redact(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, RedactionReason reason, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred) => throw new NotImplementedException();

/// <inheritdoc/>
public Task<IEnumerable<EventType>> Redact(EventSequenceId eventSequenceId, EventSourceId eventSourceId, RedactionReason reason, IEnumerable<EventType>? eventTypes, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain) => throw new NotImplementedException();
public Task<IEnumerable<EventType>> Redact(EventSequenceId eventSequenceId, EventSourceId eventSourceId, RedactionReason reason, IEnumerable<EventType>? eventTypes, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred) => throw new NotImplementedException();

/// <inheritdoc/>
public Task<AppendedEvent> GetEventAt(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber) => throw new NotImplementedException();
Expand Down
26 changes: 21 additions & 5 deletions Source/Kernel/Domain/EventSequences/EventSequence.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright (c) Aksio Insurtech. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Aksio.Cratis.Auditing;
using Aksio.Cratis.Events;
using Aksio.Cratis.EventSequences;
using Aksio.Cratis.Identities;
using Aksio.Cratis.Kernel.Grains.Workers;
using Microsoft.AspNetCore.Mvc;
using IEventSequence = Aksio.Cratis.Kernel.Grains.EventSequences.IEventSequence;
Expand All @@ -19,18 +21,26 @@ public class EventSequence : Controller
{
readonly IGrainFactory _grainFactory;
readonly IExecutionContextManager _executionContextManager;
readonly ICausationManager _causationManager;
readonly IIdentityProvider _identityProvider;

/// <summary>
/// Initializes a new instance of the <see cref="EventSequence"/> class.
/// </summary>
/// <param name="grainFactory"><see cref="IGrainFactory"/>.</param>
/// <param name="executionContextManager"><see cref="IExecutionContextManager"/>.</param>
/// <param name="causationManager">The <see cref="ICausationManager"/> for working with causation.</param>
/// <param name="identityProvider">The <see cref="IIdentityProvider"/> for getting current identity.</param>
public EventSequence(
IGrainFactory grainFactory,
IExecutionContextManager executionContextManager)
IExecutionContextManager executionContextManager,
ICausationManager causationManager,
IIdentityProvider identityProvider)
{
_grainFactory = grainFactory;
_executionContextManager = executionContextManager;
_causationManager = causationManager;
_identityProvider = identityProvider;
}

/// <summary>
Expand Down Expand Up @@ -95,13 +105,16 @@ public async Task RedactEvent(
[FromRoute] TenantId tenantId,
[FromBody] RedactEvent redaction)
{
var causation = redaction.Causation ?? _causationManager.GetCurrentChain();
var causedBy = redaction.CausedBy ?? _identityProvider.GetCurrent();

_executionContextManager.Establish(tenantId, _executionContextManager.Current.CorrelationId, microserviceId);
var eventSequence = GetEventSequence(microserviceId, eventSequenceId, tenantId);
var worker = await eventSequence.Redact(
redaction.SequenceNumber,
redaction.Reason,
redaction.Causation,
redaction.CausedBy);
causation,
causedBy);
await worker.WaitForResult();
}

Expand All @@ -120,14 +133,17 @@ public async Task RedactEvents(
[FromRoute] TenantId tenantId,
[FromBody] RedactEvents redaction)
{
var causation = redaction.Causation ?? _causationManager.GetCurrentChain();
var causedBy = redaction.CausedBy ?? _identityProvider.GetCurrent();

_executionContextManager.Establish(tenantId, _executionContextManager.Current.CorrelationId, microserviceId);
var eventSequence = GetEventSequence(microserviceId, eventSequenceId, tenantId);
var worker = await eventSequence.Redact(
redaction.EventSourceId,
redaction.Reason,
redaction.EventTypes.Select(_ => new EventType(_, EventGeneration.Unspecified)).ToArray(),
redaction.Causation,
redaction.CausedBy);
causation,
causedBy);
await worker.WaitForResult();
}

Expand Down
4 changes: 2 additions & 2 deletions Source/Kernel/Domain/EventSequences/RedactEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ namespace Aksio.Cratis.Kernel.Domain.EventSequences;
public record RedactEvent(
EventSequenceNumber SequenceNumber,
RedactionReason Reason,
IEnumerable<Causation> Causation,
Identity CausedBy);
IEnumerable<Causation>? Causation,
Identity? CausedBy);
4 changes: 2 additions & 2 deletions Source/Kernel/Domain/EventSequences/RedactEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ public record RedactEvents(
EventSourceId EventSourceId,
RedactionReason Reason,
IEnumerable<EventTypeId> EventTypes,
IEnumerable<Causation> Causation,
Identity CausedBy);
IEnumerable<Causation>? Causation,
Identity? CausedBy);
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Aksio Insurtech. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Concurrent;
using Aksio.Cratis.Projections;
using Aksio.Cratis.Projections.Definitions;
using Aksio.Cratis.Projections.Json;
Expand All @@ -16,7 +15,7 @@ public class ProjectionDefinitions : IProjectionDefinitions
{
readonly IProjectionDefinitionsStorage _storage;
readonly IJsonProjectionSerializer _projectionSerializer;
readonly ConcurrentDictionary<ProjectionId, ProjectionDefinition> _definitions = new();
readonly Dictionary<ProjectionId, ProjectionDefinition> _definitions = new();

/// <summary>
/// Initializes a new instance of the <see cref="ProjectionDefinition"/> class.
Expand Down Expand Up @@ -87,7 +86,7 @@ void ThrowIfMissingProjectionDefinition(ProjectionId identifier)

async Task PopulateIfEmpty()
{
if (_definitions.IsEmpty)
if (_definitions.Count == 0)
{
await Populate();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Aksio Insurtech. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Concurrent;
using Aksio.Cratis.Projections;
using Aksio.Cratis.Projections.Definitions;

Expand All @@ -14,7 +13,7 @@ namespace Aksio.Cratis.Kernel.Engines.Projections.Definitions;
public class ProjectionPipelineDefinitions : IProjectionPipelineDefinitions
{
readonly IProjectionPipelineDefinitionsStorage _storage;
readonly ConcurrentDictionary<ProjectionId, ProjectionPipelineDefinition> _definitions = new();
readonly Dictionary<ProjectionId, ProjectionPipelineDefinition> _definitions = new();

/// <summary>
/// Initializes a new instance of the <see cref="ProjectionDefinition"/> class.
Expand Down Expand Up @@ -65,7 +64,7 @@ async Task PopulateIfMissing(ProjectionId projectionId)

async Task PopulateIfEmpty()
{
if (_definitions.IsEmpty)
if (_definitions.Count == 0)
{
await Populate();
}
Expand Down
5 changes: 2 additions & 3 deletions Source/Kernel/Engines/Projections/ProjectionManager.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Aksio Insurtech. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Concurrent;
using Aksio.Cratis.Kernel.Engines.Projections.Pipelines;
using Aksio.Cratis.Projections;
using Aksio.Cratis.Projections.Definitions;
Expand All @@ -16,8 +15,8 @@ public class ProjectionManager : IProjectionManager
{
readonly IProjectionFactory _projectionFactory;
readonly IProjectionPipelineFactory _projectionPipelineFactory;
readonly ConcurrentDictionary<ProjectionId, IProjection> _projections = new();
readonly ConcurrentDictionary<ProjectionId, IProjectionPipeline> _pipelines = new();
readonly Dictionary<ProjectionId, IProjection> _projections = new();
readonly Dictionary<ProjectionId, IProjectionPipeline> _pipelines = new();

/// <summary>
/// Initializes a new instance of the <see cref="ProjectionManager"/> class.
Expand Down
3 changes: 1 addition & 2 deletions Source/Kernel/Engines/Projections/ProjectionSinks.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Aksio Insurtech. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Concurrent;
using Aksio.Cratis.Projections;
using Aksio.Types;

Expand All @@ -16,7 +15,7 @@ public class ProjectionSinks : IProjectionSinks
sealed record Key(ProjectionSinkTypeId TypeId, ProjectionId ProjectionId);

readonly IDictionary<ProjectionSinkTypeId, IProjectionSinkFactory> _factories;
readonly ConcurrentDictionary<Key, IProjectionSink> _stores = new();
readonly Dictionary<Key, IProjectionSink> _stores = new();

/// <summary>
/// Initializes a new instance of the <see cref="ProjectionSinks"/> class.
Expand Down
6 changes: 4 additions & 2 deletions Source/Kernel/Grains/EventSequences/EventSequence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ public async Task<IWorker<RewindPartitionForObserversAfterRedactRequest, RewindP
sequenceNumber,
reason,
causation,
await _identityStoreProvider().GetFor(causedBy));
await _identityStoreProvider().GetFor(causedBy),
DateTimeOffset.UtcNow);
return await RewindPartitionForAffectedObservers(affectedEvent.Context.EventSourceId, sequenceNumber, new[] { affectedEvent.Metadata.Type });
}

Expand All @@ -272,7 +273,8 @@ public async Task<IWorker<RewindPartitionForObserversAfterRedactRequest, RewindP
reason,
eventTypes,
causation,
await _identityStoreProvider().GetFor(causedBy));
await _identityStoreProvider().GetFor(causedBy),
DateTimeOffset.UtcNow);
return await RewindPartitionForAffectedObservers(eventSourceId, EventSequenceNumber.First, affectedEventTypes);
}

Expand Down
21 changes: 10 additions & 11 deletions Source/Kernel/Grains/EventSequences/Streaming/EventSequenceCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public EventSequenceCache(
_executionContextManager = executionContextManager;
_eventSequenceStorageProvider = eventSequenceStorageProvider;
_logger = logger;

PreFillWithTailWindow();
}

/// <inheritdoc/>
Expand Down Expand Up @@ -155,6 +153,16 @@ public void Purge()
}
}

/// <inheritdoc/>
public async Task PrimeWithTailWindow()
{
_executionContextManager.Establish(_tenantId, CorrelationId.New(), _microserviceId);
var tail = await _eventSequenceStorageProvider().GetTailSequenceNumber(_eventSequenceId);
tail -= NumberOfEventsToFetch;
if ((long)tail.Value < 0) tail = 0;
Prime(tail);
}

void AddImplementation(AppendedEvent @event)
{
if (_eventsBySequenceNumber.ContainsKey(@event.Metadata.SequenceNumber))
Expand Down Expand Up @@ -192,13 +200,4 @@ void RelievePressure()
}
}
}

void PreFillWithTailWindow()
{
_executionContextManager.Establish(_tenantId, CorrelationId.New(), _microserviceId);
var tail = _eventSequenceStorageProvider().GetTailSequenceNumber(_eventSequenceId).GetAwaiter().GetResult();
tail -= NumberOfEventsToFetch;
if ((long)tail.Value < 0) tail = 0;
Prime(tail);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (c) Aksio Insurtech. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Concurrent;
using Aksio.Cratis.EventSequences;
using Aksio.Cratis.Kernel.Configuration;

namespace Aksio.Cratis.Kernel.Grains.EventSequences.Streaming;

Expand All @@ -12,16 +12,21 @@ namespace Aksio.Cratis.Kernel.Grains.EventSequences.Streaming;
[Singleton]
public class EventSequenceCaches : IEventSequenceCaches
{
readonly ConcurrentDictionary<(MicroserviceId, TenantId, EventSequenceId), IEventSequenceCache> _caches = new();
readonly Dictionary<(MicroserviceId, TenantId, EventSequenceId), IEventSequenceCache> _caches = new();
readonly IEventSequenceCacheFactory _eventSequenceCacheFactory;
readonly KernelConfiguration _configuration;

/// <summary>
/// Initializes a new instance of the <see cref="EventSequenceCaches"/> class.
/// </summary>
/// <param name="eventSequenceCacheFactory"><see cref="IEventSequenceCacheFactory"/> for creating <see cref="IEventSequenceCache"/> instances.</param>
public EventSequenceCaches(IEventSequenceCacheFactory eventSequenceCacheFactory)
/// <param name="configuration">The <see cref="KernelConfiguration"/>.</param>///
public EventSequenceCaches(
IEventSequenceCacheFactory eventSequenceCacheFactory,
KernelConfiguration configuration)
{
_eventSequenceCacheFactory = eventSequenceCacheFactory;
_configuration = configuration;
}

/// <inheritdoc/>
Expand All @@ -40,6 +45,22 @@ public IEventSequenceCache GetFor(MicroserviceId microserviceId, TenantId tenant
/// <inheritdoc/>
public bool IsUnderPressure() => _caches.Values.Any(_ => _.IsUnderPressure());

/// <inheritdoc/>
public Task PrimeAll()
{
var tasks = new List<Task>();

foreach (var (microserviceId, microservice) in _configuration.Microservices)
{
foreach (var (tenantId, _) in _configuration.Tenants)
{
tasks.Add(GetFor(microserviceId, tenantId, EventSequenceId.Log).PrimeWithTailWindow());
}
}

return Task.WhenAll(tasks.ToArray());
}

/// <inheritdoc/>
public void Purge()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Aksio Insurtech. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Concurrent;
using Aksio.Cratis.Events;
using Aksio.Cratis.EventSequences;
using Aksio.Cratis.Identities;
Expand All @@ -17,7 +16,7 @@ namespace Aksio.Cratis.Kernel.Grains.EventSequences.Streaming;
/// </summary>
public class EventSequenceQueueAdapter : IQueueAdapter
{
readonly ConcurrentDictionary<QueueId, EventSequenceQueueAdapterReceiver> _receivers = new();
readonly Dictionary<QueueId, EventSequenceQueueAdapterReceiver> _receivers = new();

readonly IStreamQueueMapper _mapper;
readonly ProviderFor<IEventSequenceStorage> _eventSequenceStorageProvider;
Expand Down Expand Up @@ -75,6 +74,7 @@ await _eventSequenceStorageProvider().Append(
appendedEvent.Metadata.Type,
appendedEvent.Context.Causation,
await _identityStoreProvider().GetFor(appendedEvent.Context.CausedBy),
DateTimeOffset.UtcNow,
appendedEvent.Context.ValidFrom,
appendedEvent.Content);

Expand Down
Loading

0 comments on commit 9dd116e

Please sign in to comment.