diff --git a/Integration/Orleans.InProcess/AggregateRoots/Concepts/UserId.cs b/Integration/Orleans.InProcess/AggregateRoots/Concepts/UserId.cs new file mode 100644 index 000000000..27e6a20b2 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/Concepts/UserId.cs @@ -0,0 +1,10 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Concepts; + +public record UserId(Guid Value) : ConceptAs(Value) +{ + public static implicit operator Guid(UserId value) => value.Value; + public static implicit operator UserId(Guid value) => new(value); +} \ No newline at end of file diff --git a/Integration/Orleans.InProcess/AggregateRoots/Concepts/UserName.cs b/Integration/Orleans.InProcess/AggregateRoots/Concepts/UserName.cs new file mode 100644 index 000000000..1400ca301 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/Concepts/UserName.cs @@ -0,0 +1,10 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Concepts; + +public record UserName(string Value) : ConceptAs(Value) +{ + public static implicit operator string(UserName value) => value.Value; + public static implicit operator UserName(string value) => new(value); +} \ No newline at end of file diff --git a/Integration/Orleans.InProcess/AggregateRoots/Domain.Interfaces/IUser.cs b/Integration/Orleans.InProcess/AggregateRoots/Domain.Interfaces/IUser.cs new file mode 100644 index 000000000..278418d83 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/Domain.Interfaces/IUser.cs @@ -0,0 +1,15 @@ +using Cratis.Chronicle.Aggregates; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Concepts; +using IAggregateRoot = Cratis.Chronicle.Orleans.Aggregates.IAggregateRoot; + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Domain.Interfaces; + +public record UserInternalState(StateProperty Name, StateProperty Deleted); + +public interface IUser : IIntegrationTestAggregateRoot +{ + Task Onboard(UserName name); + Task Delete(); + Task ChangeUserName(UserName newName); + Task Exists(); +} diff --git a/Integration/Orleans.InProcess/AggregateRoots/Domain/User.cs b/Integration/Orleans.InProcess/AggregateRoots/Domain/User.cs new file mode 100644 index 000000000..f10d4d721 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/Domain/User.cs @@ -0,0 +1,42 @@ +using Cratis.Chronicle.Aggregates; +using Cratis.Chronicle.Concepts.Events; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Concepts; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Domain.Interfaces; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Events; +using AggregateRoot = Cratis.Chronicle.Orleans.Aggregates.AggregateRoot; +using IAggregateRootFactory = Cratis.Chronicle.Orleans.Aggregates.IAggregateRootFactory; + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Domain; + +public class User(IAggregateRootFactory aggregateRootFactory) : AggregateRoot, IUser +{ + public class UserDeleted(EventSourceId id) : InvalidOperationException($"User {id} is deleted"); + + public StateProperty Name = StateProperty.Empty; + public StateProperty Deleted = StateProperty.Empty; + + public Task Onboard(UserName name) => ApplyIfNotDeleted(new UserOnBoarded(name)); + public Task Delete() => ApplyIfNotDeleted(new Events.UserDeleted()); + public Task ChangeUserName(UserName newName) => + Name.Value == newName ? Task.CompletedTask : ApplyIfNotDeleted(new UserNameChanged(newName)); + + public Task Exists() => Task.FromResult(!IsNew && !Deleted.Value); + public Task GetCorrelationId() => Task.FromResult(Context!.UnitOfWOrk.CorrelationId); + public Task GetIsNew() => Task.FromResult(IsNew); + + Task ApplyIfNotDeleted(object evt) => Deleted.Value ? throw new UserDeleted(IdentityString) : Apply(evt); + + public Task On(UserOnBoarded evt) + { + Name = Name.New(evt.Name); + return Task.CompletedTask; + } + + public Task On(UserDeleted evt) + { + Deleted = Deleted.New(true); + return Task.CompletedTask; + } + + public Task GetState() => Task.FromResult(new UserInternalState(Name, Deleted)); +} diff --git a/Integration/Orleans.InProcess/AggregateRoots/EventAndEventSourceId.cs b/Integration/Orleans.InProcess/AggregateRoots/EventAndEventSourceId.cs new file mode 100644 index 000000000..44d284bb6 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/EventAndEventSourceId.cs @@ -0,0 +1,8 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Events; + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots; + +public record EventAndEventSourceId(EventSourceId EventSourceId, object Event); \ No newline at end of file diff --git a/Integration/Orleans.InProcess/AggregateRoots/Events/UserDeleted.cs b/Integration/Orleans.InProcess/AggregateRoots/Events/UserDeleted.cs new file mode 100644 index 000000000..8229fd52c --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/Events/UserDeleted.cs @@ -0,0 +1,6 @@ +using Cratis.Chronicle.Events; + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Events; + +[EventType] +public record UserDeleted; diff --git a/Integration/Orleans.InProcess/AggregateRoots/Events/UserNameChanged.cs b/Integration/Orleans.InProcess/AggregateRoots/Events/UserNameChanged.cs new file mode 100644 index 000000000..336a12353 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/Events/UserNameChanged.cs @@ -0,0 +1,9 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Events; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Concepts; + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Events; +[EventType] +public record UserNameChanged(UserName NewName); \ No newline at end of file diff --git a/Integration/Orleans.InProcess/AggregateRoots/Events/UserOnboarded.cs b/Integration/Orleans.InProcess/AggregateRoots/Events/UserOnboarded.cs new file mode 100644 index 000000000..0678d1e1b --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/Events/UserOnboarded.cs @@ -0,0 +1,7 @@ +using Cratis.Chronicle.Events; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Concepts; + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Events; + +[EventType] +public record UserOnBoarded(UserName Name); \ No newline at end of file diff --git a/Integration/Orleans.InProcess/AggregateRoots/IIntegrationTestAggregateRoot.cs b/Integration/Orleans.InProcess/AggregateRoots/IIntegrationTestAggregateRoot.cs new file mode 100644 index 000000000..b52a82402 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/IIntegrationTestAggregateRoot.cs @@ -0,0 +1,14 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Orleans.Aggregates; + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots; + +public interface IIntegrationTestAggregateRoot : IAggregateRoot + where TInternalState : class +{ + Task GetState(); + Task GetCorrelationId(); + Task GetIsNew(); +} \ No newline at end of file diff --git a/Integration/Orleans.InProcess/AggregateRoots/Scenarios/given/context_for_aggregate_root.cs b/Integration/Orleans.InProcess/AggregateRoots/Scenarios/given/context_for_aggregate_root.cs new file mode 100644 index 000000000..46dbcbf73 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/Scenarios/given/context_for_aggregate_root.cs @@ -0,0 +1,62 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Events; +using Cratis.Chronicle.Integration.Base; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Domain; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Domain.Interfaces; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Events; +using Cratis.Chronicle.Orleans.Aggregates; +using Cratis.Chronicle.Transactions; + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Scenarios.given; + +public class context_for_aggregate_root(GlobalFixture globalFixture) : IntegrationSpecificationContext(globalFixture) + where TAggregate : IIntegrationTestAggregateRoot + where TInternalState : class +{ +#pragma warning disable CA2213 // Disposable fields should be disposed + protected GlobalFixture _globalFixture = globalFixture; +#pragma warning restore CA2213 // Disposable fields should be disposed + + public TInternalState ResultState; + public bool IsNew; + public IUnitOfWork UnitOfWork; + + public override IEnumerable AggregateRoots => [typeof(User)]; + public override IEnumerable EventTypes => [typeof(UserOnBoarded), typeof(UserDeleted), typeof(UserNameChanged)]; + + protected List EventsWithEventSourceIdToAppend = []; + protected IAggregateRootFactory AggregateRootFactory => Services.GetRequiredService(); + protected IUnitOfWorkManager UnitOfWorkManager => Services.GetRequiredService(); + + protected override void ConfigureServices(IServiceCollection services) + { + } + + protected async Task DoOnAggregate(EventSourceId eventSourceId, Func action, bool commitUnitOfWork = true) + { + var user = await AggregateRootFactory.Get(eventSourceId); + IsNew = await user.GetIsNew(); + await action(user); + var correlationId = await user.GetCorrelationId(); + ResultState = await user.GetState(); + UnitOfWorkManager.TryGetFor(correlationId, out UnitOfWork); + if (commitUnitOfWork) + { + await UnitOfWork!.Commit(); + } + } + + void Establish() + { + } + + async Task Because() + { + foreach (var @event in EventsWithEventSourceIdToAppend) + { + await EventStore.EventLog.Append(@event.EventSourceId, @event.Event); + } + } +} diff --git a/Integration/Orleans.InProcess/AggregateRoots/Scenarios/when_there_are_events_to_rehydrate/and_performing_no_action_on_aggregate.cs b/Integration/Orleans.InProcess/AggregateRoots/Scenarios/when_there_are_events_to_rehydrate/and_performing_no_action_on_aggregate.cs new file mode 100644 index 000000000..31f982839 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/Scenarios/when_there_are_events_to_rehydrate/and_performing_no_action_on_aggregate.cs @@ -0,0 +1,53 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Aggregates; +using Cratis.Chronicle.Events; +using Cratis.Chronicle.Integration.Base; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Concepts; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Domain; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Domain.Interfaces; +using Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Events; +using Cratis.Chronicle.Transactions; +using context = Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Scenarios.when_there_are_events_to_rehydrate.and_performing_no_action_on_aggregate.context; + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots.Scenarios.when_there_are_events_to_rehydrate; + +[Collection(GlobalCollection.Name)] +public class and_performing_no_action_on_aggregate(context context) : Given(context) +{ + public class context(GlobalFixture globalFixture) : given.context_for_aggregate_root(globalFixture) + { + UserId _userId; + public UserName UserName; + + public bool UserExists; + + void Establish() + { + _userId = Guid.NewGuid(); + UserName = "some name"; + EventsWithEventSourceIdToAppend.Add(new EventAndEventSourceId(_userId.Value, new UserOnBoarded(UserName))); + } + + Task Because() => DoOnAggregate(_userId.Value, async user => UserExists = await user.Exists()); + } + + [Fact] + void should_not_be_new_aggregate() => Context.IsNew.ShouldBeFalse(); + + [Fact] + void should_return_that_user_exists() => Context.UserExists.ShouldBeTrue(); + + [Fact] + void should_commit_unit_of_work_successfully() => Context.UnitOfWork.IsSuccess.ShouldBeTrue(); + + [Fact] + void should_not_commit_any_events() => Context.UnitOfWork.GetEvents().ShouldBeEmpty(); + + [Fact] + void should_not_be_deleted() => Context.ResultState.Deleted.ShouldEqual(new StateProperty(false, 0)); + + [Fact] + void should_have_assigned_the_username_once() => Context.ResultState.Name.ShouldEqual(new StateProperty(Context.UserName, 1)); +} diff --git a/Integration/Orleans.InProcess/AggregateRoots/StateProperty.cs b/Integration/Orleans.InProcess/AggregateRoots/StateProperty.cs new file mode 100644 index 000000000..4fb6016c7 --- /dev/null +++ b/Integration/Orleans.InProcess/AggregateRoots/StateProperty.cs @@ -0,0 +1,11 @@ + +namespace Cratis.Chronicle.Integration.Orleans.InProcess.AggregateRoots; + +public record StateProperty(TValue Value, int CallCount) +{ + public static StateProperty Empty => new(default, 0); + + public StateProperty New(TValue value) => IncrementCallCount() with { Value = value }; + + public StateProperty IncrementCallCount() => this with { CallCount = CallCount + 1 }; +} \ No newline at end of file diff --git a/Integration/Orleans.InProcess/Orleans.InProcess.csproj b/Integration/Orleans.InProcess/Orleans.InProcess.csproj index 051cb4621..c49321b6a 100644 --- a/Integration/Orleans.InProcess/Orleans.InProcess.csproj +++ b/Integration/Orleans.InProcess/Orleans.InProcess.csproj @@ -18,6 +18,9 @@ + + + diff --git a/Source/Clients/AspNetCore/Transactions/UnitOfWorkActionFilter.cs b/Source/Clients/AspNetCore/Transactions/UnitOfWorkActionFilter.cs index 3b623b564..1a04f60bb 100644 --- a/Source/Clients/AspNetCore/Transactions/UnitOfWorkActionFilter.cs +++ b/Source/Clients/AspNetCore/Transactions/UnitOfWorkActionFilter.cs @@ -4,6 +4,7 @@ using Cratis.Chronicle.Events.Constraints; using Cratis.Chronicle.Transactions; using Microsoft.AspNetCore.Mvc.Filters; +using Microsoft.Extensions.Logging; namespace Cratis.Chronicle.AspNetCore.Transactions; @@ -11,7 +12,8 @@ namespace Cratis.Chronicle.AspNetCore.Transactions; /// Represents an implementation of for managing units of work and errors. /// /// The to use. -public class UnitOfWorkActionFilter(IUnitOfWorkManager unitOfWorkManager) : IAsyncActionFilter +/// The . +public class UnitOfWorkActionFilter(IUnitOfWorkManager unitOfWorkManager, ILogger logger) : IAsyncActionFilter { /// public async Task OnActionExecutionAsync(ActionExecutingContext context, ActionExecutionDelegate next) @@ -19,7 +21,15 @@ public async Task OnActionExecutionAsync(ActionExecutingContext context, ActionE await next(); var unitOfWork = unitOfWorkManager.Current; - await unitOfWork.Commit(); + + if (!unitOfWork.IsCompleted) + { + await unitOfWork.Commit(); + } + else + { + logger.AlreadyCompletedManually(unitOfWork.CorrelationId); + } if (!unitOfWork.IsSuccess) { foreach (var violation in unitOfWork.GetConstraintViolations()) @@ -29,4 +39,4 @@ public async Task OnActionExecutionAsync(ActionExecutingContext context, ActionE } } } -} +} \ No newline at end of file diff --git a/Source/Clients/AspNetCore/Transactions/UnitOfWorkActionFilterLogging.cs b/Source/Clients/AspNetCore/Transactions/UnitOfWorkActionFilterLogging.cs new file mode 100644 index 000000000..b726074ba --- /dev/null +++ b/Source/Clients/AspNetCore/Transactions/UnitOfWorkActionFilterLogging.cs @@ -0,0 +1,13 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Execution; +using Microsoft.Extensions.Logging; + +namespace Cratis.Chronicle.AspNetCore.Transactions; + +internal static partial class UnitOfWorkActionFilterLogging +{ + [LoggerMessage(LogLevel.Warning, "The unit of work for correlation {CorrelationId} has already been completed manually either by Commit or Rollback. It is recommended to not complete the unit of work manually in this context.")] + internal static partial void AlreadyCompletedManually(this ILogger logger, CorrelationId correlationId); +} \ No newline at end of file diff --git a/Source/Clients/AspNetCore/Transactions/UnitOfWorkMiddleware.cs b/Source/Clients/AspNetCore/Transactions/UnitOfWorkMiddleware.cs index 584c0e04c..e2a65e5d8 100644 --- a/Source/Clients/AspNetCore/Transactions/UnitOfWorkMiddleware.cs +++ b/Source/Clients/AspNetCore/Transactions/UnitOfWorkMiddleware.cs @@ -21,7 +21,18 @@ public class UnitOfWorkMiddleware(IUnitOfWorkManager unitOfWorkManager, RequestD /// Awaitable task. public async Task InvokeAsync(HttpContext context) { - unitOfWorkManager.Begin(CorrelationId.New()); - await next(context); + var unitOfWork = unitOfWorkManager.Begin(CorrelationId.New()); + try + { + await next(context); + } + catch + { + if (!unitOfWork.IsCompleted) + { + unitOfWork.Dispose(); + } + throw; + } } } diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRoot/given/a_stateful_aggregate_root.cs b/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRoot/given/a_stateful_aggregate_root.cs index 484a79709..b3207096e 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRoot/given/a_stateful_aggregate_root.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRoot/given/a_stateful_aggregate_root.cs @@ -21,7 +21,7 @@ void Establish() _unitOfWork = Substitute.For(); - _aggregateRootContext = new AggregateRootContext(_eventSourceId, _eventSequence, _aggregateRoot, _unitOfWork); + _aggregateRootContext = new AggregateRootContext(_eventSourceId, _eventSequence, _aggregateRoot, _unitOfWork, EventSequenceNumber.First); _aggregateRoot._context = _aggregateRootContext; _aggregateRoot._mutation = _mutation; } diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRoot/given/a_stateless_aggregate_root.cs b/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRoot/given/a_stateless_aggregate_root.cs index 8b1492062..384df25b5 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRoot/given/a_stateless_aggregate_root.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRoot/given/a_stateless_aggregate_root.cs @@ -18,7 +18,7 @@ void Establish() _aggregateRoot = new(); _eventSourceId = Guid.NewGuid().ToString(); _unitOfWork = Substitute.For(); - _aggregateRootContext = new AggregateRootContext(_eventSourceId, _eventSequence, _aggregateRoot, _unitOfWork); + _aggregateRootContext = new AggregateRootContext(_eventSourceId, _eventSequence, _aggregateRoot, _unitOfWork, EventSequenceNumber.First); _aggregateRoot._context = _aggregateRootContext; _aggregateRoot._mutation = _mutation; diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRootMutatorFactory/when_creating_for_stateful_aggregate_root.cs b/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRootMutatorFactory/when_creating_for_stateful_aggregate_root.cs index a1a85ddcb..8c06b33d6 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRootMutatorFactory/when_creating_for_stateful_aggregate_root.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRootMutatorFactory/when_creating_for_stateful_aggregate_root.cs @@ -16,7 +16,8 @@ public class when_creating_for_stateful_aggregate_root : given.an_aggregate_root EventSourceId.New(), Substitute.For(), new StatefulAggregateRoot(), - Substitute.For()); + Substitute.For(), + EventSequenceNumber.First); async Task Because() => _result = await _factory.Create(_context); diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRootMutatorFactory/when_creating_for_stateless_aggregate_root.cs b/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRootMutatorFactory/when_creating_for_stateless_aggregate_root.cs index 8c80b84a5..115b01c92 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRootMutatorFactory/when_creating_for_stateless_aggregate_root.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_AggregateRootMutatorFactory/when_creating_for_stateless_aggregate_root.cs @@ -16,7 +16,8 @@ public class when_creating_for_stateless_aggregate_root : given.an_aggregate_roo EventSourceId.New(), Substitute.For(), new StatelessAggregateRoot(), - Substitute.For()); + Substitute.For(), + EventSequenceNumber.First); async Task Because() => _result = await _factory.Create(_context); diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_ProjectionAggregateRootStateProvider/given/an_aggregate_root_that_handles_two_event_types.cs b/Source/Clients/DotNET.Specs/Aggregates/for_ProjectionAggregateRootStateProvider/given/an_aggregate_root_that_handles_two_event_types.cs index 868602e75..334a00706 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_ProjectionAggregateRootStateProvider/given/an_aggregate_root_that_handles_two_event_types.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_ProjectionAggregateRootStateProvider/given/an_aggregate_root_that_handles_two_event_types.cs @@ -28,7 +28,8 @@ void Establish() _eventSourceId, _eventSequence, _aggregateRoot, - _unitOfWork); + _unitOfWork, + EventSequenceNumber.First); _eventTypes = new EventType[] { diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_ProjectionAggregateRootStateProvider/when_providing_with_events_in_sequence.cs b/Source/Clients/DotNET.Specs/Aggregates/for_ProjectionAggregateRootStateProvider/when_providing_with_events_in_sequence.cs index 68325fb84..6e4ed8386 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_ProjectionAggregateRootStateProvider/when_providing_with_events_in_sequence.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_ProjectionAggregateRootStateProvider/when_providing_with_events_in_sequence.cs @@ -28,5 +28,4 @@ void Establish() async Task Because() => _result = await _provider.Provide(); [Fact] void should_return_the_state() => _result.ShouldEqual(_state); - [Fact] void should_set_has_events_for_rehydration_to_true() => _aggregateRootContext.HasEventsForRehydration.ShouldBeTrue(); } diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_ReducerAggregateRootStateProvider/given/an_aggregate_root_that_handles_two_event_types.cs b/Source/Clients/DotNET.Specs/Aggregates/for_ReducerAggregateRootStateProvider/given/an_aggregate_root_that_handles_two_event_types.cs index ad60a8b61..d7f1c64d5 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_ReducerAggregateRootStateProvider/given/an_aggregate_root_that_handles_two_event_types.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_ReducerAggregateRootStateProvider/given/an_aggregate_root_that_handles_two_event_types.cs @@ -24,7 +24,8 @@ void Establish() _eventSourceId, _eventSequence, _aggregateRoot, - _unitOfWork); + _unitOfWork, + EventSequenceNumber.First); _eventTypes = new EventType[] { diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_ReducerAggregateRootStateProvider/when_providing_with_events_in_sequence.cs b/Source/Clients/DotNET.Specs/Aggregates/for_ReducerAggregateRootStateProvider/when_providing_with_events_in_sequence.cs index 16473e09b..99423a49f 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_ReducerAggregateRootStateProvider/when_providing_with_events_in_sequence.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_ReducerAggregateRootStateProvider/when_providing_with_events_in_sequence.cs @@ -20,5 +20,4 @@ void Establish() async Task Because() => _result = await _provider.Provide(); [Fact] void should_return_the_state() => _result.ShouldEqual(_state); - [Fact] void should_set_has_events_for_rehydration_to_true() => _aggregateRootContext.HasEventsForRehydration.ShouldBeTrue(); } diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/given/a_stateless_aggregate_root_mutator.cs b/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/given/a_stateless_aggregate_root_mutator.cs index 799bdaebf..178153b4c 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/given/a_stateless_aggregate_root_mutator.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/given/a_stateless_aggregate_root_mutator.cs @@ -33,7 +33,8 @@ void Establish() _eventSourceId, _eventSequence, _aggregateRoot, - _unitOfWork); + _unitOfWork, + EventSequenceNumber.First); _eventSerializer = Substitute.For(); _eventHandlers = Substitute.For(); diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/when_rehydrating/and_aggregate_root_does_not_have_any_handle_methods_but_has_events.cs b/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/when_rehydrating/and_aggregate_root_does_not_have_any_handle_methods_but_has_events.cs index e734a3325..29e30e3c9 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/when_rehydrating/and_aggregate_root_does_not_have_any_handle_methods_but_has_events.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/when_rehydrating/and_aggregate_root_does_not_have_any_handle_methods_but_has_events.cs @@ -16,6 +16,5 @@ void Establish() [Fact] void should_not_get_any_events() => _eventSequence.DidNotReceive().GetForEventSourceIdAndEventTypes(_eventSourceId, Arg.Any>()); [Fact] void should_not_handle_any_events() => _eventHandlers.DidNotReceive().Handle(Arg.Any(), Arg.Any>()); - [Fact] void should_ask_if_there_are_events_for_the_event_source_id() => _eventSequence.Received(1).HasEventsFor(_eventSourceId); - [Fact] void should_set_has_events_for_rehydration_to_true() => _aggregateRootContext.HasEventsForRehydration.ShouldBeTrue(); + [Fact] void should_ask_if_there_are_events_for_the_event_source_id() => _eventSequence.Received(1).GetFromSequenceNumber(EventSequenceNumber.First, _eventSourceId, _eventHandlers.EventTypes); } diff --git a/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/when_rehydrating/and_aggregate_root_has_handle_methods.cs b/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/when_rehydrating/and_aggregate_root_has_handle_methods.cs index 65636da3e..c1c7a83d8 100644 --- a/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/when_rehydrating/and_aggregate_root_has_handle_methods.cs +++ b/Source/Clients/DotNET.Specs/Aggregates/for_StatelessAggregateRootMutator/when_rehydrating/and_aggregate_root_has_handle_methods.cs @@ -27,7 +27,7 @@ void Establish() _secondEvent ]; - _appendedEvents = new AppendedEvent[] + _appendedEvents = new[] { AppendedEvent.EmptyWithContent(_firstEvent), AppendedEvent.EmptyWithContent(_secondEvent) @@ -35,11 +35,9 @@ void Establish() _eventHandlers.HasHandleMethods.Returns(true); _eventSequence - .GetForEventSourceIdAndEventTypes(_eventSourceId, Arg.Any>()) + .GetFromSequenceNumber(EventSequenceNumber.First, _eventSourceId, Arg.Any>()) .Returns(_appendedEvents); - _eventSequence.HasEventsFor(_eventSourceId).Returns(true); - _eventSerializer .Deserialize(Arg.Any()) .Returns((callInfo) => callInfo.Arg().Content); @@ -47,7 +45,6 @@ void Establish() async Task Because() => await _mutator.Rehydrate(); - [Fact] void should_handle_events() => _eventHandlers.Received().Handle(_aggregateRoot, Arg.Is>(arg => arg.Select(_ => _.Event).SequenceEqual(_events))); - [Fact] void should_ask_if_there_are_events_for_the_event_source_id() => _eventSequence.Received(1).HasEventsFor(_eventSourceId); - [Fact] void should_set_has_events_for_rehydration_to_true() => _aggregateRootContext.HasEventsForRehydration.ShouldBeTrue(); + [Fact] void should_handle_events() => _eventHandlers.Received().Handle(_aggregateRoot, Arg.Is>(arg => arg.Select(_ => _.Event).SequenceEqual(_events)), Arg.Any>()); + [Fact] void should_ask_if_there_are_events_for_the_event_source_id() => _eventSequence.Received(1).GetFromSequenceNumber(EventSequenceNumber.First, _eventSourceId, _eventHandlers.EventTypes); } diff --git a/Source/Clients/DotNET/Aggregates/AggregateRoot.cs b/Source/Clients/DotNET/Aggregates/AggregateRoot.cs index c4a390b09..08143f222 100644 --- a/Source/Clients/DotNET/Aggregates/AggregateRoot.cs +++ b/Source/Clients/DotNET/Aggregates/AggregateRoot.cs @@ -1,6 +1,8 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Cratis.Chronicle.Events; + #pragma warning disable SA1402 // File may only contain a single type namespace Cratis.Chronicle.Aggregates; @@ -23,7 +25,7 @@ public class AggregateRoot : IAggregateRoot /// /// Gets a value indicating whether the aggregate root is new. /// - protected bool IsNew => !_context.HasEventsForRehydration; + protected bool IsNew => _context.NextSequenceNumber == EventSequenceNumber.First; /// public async Task Apply(object @event) => await _mutation.Apply(@event); diff --git a/Source/Clients/DotNET/Aggregates/AggregateRootContext.cs b/Source/Clients/DotNET/Aggregates/AggregateRootContext.cs index 0bf812f51..f8211af5a 100644 --- a/Source/Clients/DotNET/Aggregates/AggregateRootContext.cs +++ b/Source/Clients/DotNET/Aggregates/AggregateRootContext.cs @@ -14,11 +14,13 @@ namespace Cratis.Chronicle.Aggregates; /// The for the context. /// The for the context. /// The for the context. +/// The next . public class AggregateRootContext( EventSourceId eventSourceId, IEventSequence eventSequence, IAggregateRoot aggregateRoot, - IUnitOfWork unitOfWork) : IAggregateRootContext + IUnitOfWork unitOfWork, + EventSequenceNumber nextSequenceNumber) : IAggregateRootContext { /// public EventSourceId EventSourceId { get; } = eventSourceId; @@ -33,5 +35,5 @@ public class AggregateRootContext( public IUnitOfWork UnitOfWOrk { get; } = unitOfWork; /// - public bool HasEventsForRehydration { get; set; } + public EventSequenceNumber NextSequenceNumber { get; set; } = nextSequenceNumber; } diff --git a/Source/Clients/DotNET/Aggregates/AggregateRootEventHandlers.cs b/Source/Clients/DotNET/Aggregates/AggregateRootEventHandlers.cs index 683800fa0..08879af05 100644 --- a/Source/Clients/DotNET/Aggregates/AggregateRootEventHandlers.cs +++ b/Source/Clients/DotNET/Aggregates/AggregateRootEventHandlers.cs @@ -37,7 +37,7 @@ public AggregateRootEventHandlers(IEventTypes eventTypes, Type aggregateRootType public IImmutableList EventTypes { get; } /// - public async Task Handle(IAggregateRoot target, IEnumerable events) + public async Task Handle(IAggregateRoot target, IEnumerable events, Action? onHandledEvent = default) { if (_methodsByEventType.Count == 0) return; @@ -59,6 +59,7 @@ public async Task Handle(IAggregateRoot target, IEnumerable eve } if (returnValue is not null) await returnValue; + onHandledEvent?.Invoke(eventAndContext); } } } diff --git a/Source/Clients/DotNET/Aggregates/AggregateRootFactory.cs b/Source/Clients/DotNET/Aggregates/AggregateRootFactory.cs index 9d2f6db2d..78ece583d 100644 --- a/Source/Clients/DotNET/Aggregates/AggregateRootFactory.cs +++ b/Source/Clients/DotNET/Aggregates/AggregateRootFactory.cs @@ -28,12 +28,13 @@ public class AggregateRootFactory( public async Task Get(EventSourceId id) where TAggregateRoot : IAggregateRoot { + // TODO: Create Issue: Must dispose of unit of work in some way or else it's a memory leak. var unitOfWork = unitOfWorkManager.HasCurrent ? unitOfWorkManager.Current : unitOfWorkManager.Begin(CorrelationId.New()); var aggregateRoot = ActivatorUtilities.CreateInstance(serviceProvider); var eventSequence = eventStore.GetEventSequence(EventSequenceId.Log); - var context = new AggregateRootContext(id, eventSequence, aggregateRoot, unitOfWork); + var context = new AggregateRootContext(id, eventSequence, aggregateRoot, unitOfWork, EventSequenceNumber.First); var mutator = await mutatorFactory.Create(context); await mutator.Rehydrate(); diff --git a/Source/Clients/DotNET/Aggregates/AggregateRootMutation.cs b/Source/Clients/DotNET/Aggregates/AggregateRootMutation.cs index b045c1543..3d5156acb 100644 --- a/Source/Clients/DotNET/Aggregates/AggregateRootMutation.cs +++ b/Source/Clients/DotNET/Aggregates/AggregateRootMutation.cs @@ -65,6 +65,12 @@ public async Task Apply(object @event) public async Task Commit() { await aggregateRootContext.UnitOfWOrk.Commit(); + if (aggregateRootContext.UnitOfWOrk.TryGetLastCommittedEventSequenceNumber( + out var lastCommittedEventSequenceNumber)) + { + aggregateRootContext.NextSequenceNumber = lastCommittedEventSequenceNumber + 1; + } + UncommittedEvents = ImmutableList.Empty; return AggregateRootCommitResult.CreateFrom(aggregateRootContext.UnitOfWOrk); } diff --git a/Source/Clients/DotNET/Aggregates/IAggregateRootContext.cs b/Source/Clients/DotNET/Aggregates/IAggregateRootContext.cs index 4ea36a7a8..be0c9c0dc 100644 --- a/Source/Clients/DotNET/Aggregates/IAggregateRootContext.cs +++ b/Source/Clients/DotNET/Aggregates/IAggregateRootContext.cs @@ -35,7 +35,7 @@ public interface IAggregateRootContext IUnitOfWork UnitOfWOrk { get; } /// - /// Gets or sets a value indicating whether there are events available for rehydration. + /// Gets or sets the next to process for the aggregate root in the unit of work. /// - bool HasEventsForRehydration { get; set; } + EventSequenceNumber NextSequenceNumber { get; set; } } diff --git a/Source/Clients/DotNET/Aggregates/IAggregateRootEventHandlers.cs b/Source/Clients/DotNET/Aggregates/IAggregateRootEventHandlers.cs index e40cc91cd..247a04bf7 100644 --- a/Source/Clients/DotNET/Aggregates/IAggregateRootEventHandlers.cs +++ b/Source/Clients/DotNET/Aggregates/IAggregateRootEventHandlers.cs @@ -26,6 +26,7 @@ public interface IAggregateRootEventHandlers /// /// The target to handle for. /// Collection of to handle. + /// The optional callback to do on each handled event. /// Awaitable task. - Task Handle(IAggregateRoot target, IEnumerable events); + Task Handle(IAggregateRoot target, IEnumerable events, Action? onHandledEvent = default); } diff --git a/Source/Clients/DotNET/Aggregates/ProjectionAggregateRootStateProvider.cs b/Source/Clients/DotNET/Aggregates/ProjectionAggregateRootStateProvider.cs index 2153c85ab..6be139379 100644 --- a/Source/Clients/DotNET/Aggregates/ProjectionAggregateRootStateProvider.cs +++ b/Source/Clients/DotNET/Aggregates/ProjectionAggregateRootStateProvider.cs @@ -26,11 +26,6 @@ public class ProjectionAggregateRootStateProvider( typeof(TState), aggregateRootContext.EventSourceId); - if (result.ProjectedEventsCount > 0 && aggregateRootContext is AggregateRootContext actualContext) - { - actualContext.HasEventsForRehydration = true; - } - return (TState?)result.Model; } diff --git a/Source/Clients/DotNET/Aggregates/ReducerAggregateRootStateProvider.cs b/Source/Clients/DotNET/Aggregates/ReducerAggregateRootStateProvider.cs index 9b120c950..ebd51e885 100644 --- a/Source/Clients/DotNET/Aggregates/ReducerAggregateRootStateProvider.cs +++ b/Source/Clients/DotNET/Aggregates/ReducerAggregateRootStateProvider.cs @@ -24,12 +24,6 @@ public class ReducerAggregateRootStateProvider( { var events = await aggregateRootContext.EventSequence.GetForEventSourceIdAndEventTypes(aggregateRootContext.EventSourceId, reducer.EventTypes); var result = await reducer.OnNext(events, null); - - if (result.LastSuccessfullyObservedEvent.IsActualValue && aggregateRootContext is AggregateRootContext actualContext) - { - actualContext.HasEventsForRehydration = true; - } - return (TState?)result.ModelState; } diff --git a/Source/Clients/DotNET/Aggregates/StatelessAggregateRootMutator.cs b/Source/Clients/DotNET/Aggregates/StatelessAggregateRootMutator.cs index 54854c8d9..027d2c0f0 100644 --- a/Source/Clients/DotNET/Aggregates/StatelessAggregateRootMutator.cs +++ b/Source/Clients/DotNET/Aggregates/StatelessAggregateRootMutator.cs @@ -23,11 +23,9 @@ public class StatelessAggregateRootMutator( /// public async Task Rehydrate() { - var hasEventsForRehydration = await aggregateRootContext.EventSequence.HasEventsFor(aggregateRootContext.EventSourceId); - + var events = await aggregateRootContext.EventSequence.GetFromSequenceNumber(aggregateRootContext.NextSequenceNumber, aggregateRootContext.EventSourceId, eventHandlers.EventTypes); if (eventHandlers.HasHandleMethods) { - var events = await aggregateRootContext.EventSequence.GetForEventSourceIdAndEventTypes(aggregateRootContext.EventSourceId, eventHandlers.EventTypes); var deserializedEventsTasks = events.Select(async _ => { var @event = await eventSerializer.Deserialize(_); @@ -35,12 +33,20 @@ public async Task Rehydrate() }).ToArray(); var deserializedEvents = await Task.WhenAll(deserializedEventsTasks); - await eventHandlers.Handle(aggregateRootContext.AggregateRoot, deserializedEvents); - } - if (aggregateRootContext is AggregateRootContext actualContext) - { - actualContext.HasEventsForRehydration = hasEventsForRehydration; + // Scenario is when state is partially modified before throwing an exception. + await eventHandlers.Handle(aggregateRootContext.AggregateRoot, deserializedEvents, handledEventAndContext => + { + var nextSequenceNumber = handledEventAndContext.Context.SequenceNumber.Next(); + if (handledEventAndContext.Context.SequenceNumber == EventSequenceNumber.Unavailable) + { + return; + } + if (nextSequenceNumber.IsActualValue && nextSequenceNumber > aggregateRootContext.NextSequenceNumber) + { + aggregateRootContext.NextSequenceNumber = nextSequenceNumber; + } + }); } } diff --git a/Source/Clients/DotNET/EventSequences/EventSequence.cs b/Source/Clients/DotNET/EventSequences/EventSequence.cs index 0cbd7c4da..c7349b7bc 100644 --- a/Source/Clients/DotNET/EventSequences/EventSequence.cs +++ b/Source/Clients/DotNET/EventSequences/EventSequence.cs @@ -119,6 +119,25 @@ public async Task HasEventsFor(EventSourceId eventSourceId) return result.HasEvents; } + /// + public async Task> GetFromSequenceNumber( + EventSequenceNumber sequenceNumber, + EventSourceId? eventSourceId = default, + IEnumerable? eventTypes = default) + { + var result = await connection.Services.EventSequences.GetEventsFromEventSequenceNumber(new() + { + EventStoreName = eventStoreName, + Namespace = @namespace, + EventSequenceId = eventSequenceId, + EventSequenceNumber = sequenceNumber, + EventSourceId = eventSourceId?.Value ?? default, + EventTypes = eventTypes?.ToContract() ?? [] + }); + + return result.Events.ToClient(); + } + /// public async Task> GetForEventSourceIdAndEventTypes(EventSourceId eventSourceId, IEnumerable eventTypes) { diff --git a/Source/Clients/DotNET/EventSequences/IEventSequence.cs b/Source/Clients/DotNET/EventSequences/IEventSequence.cs index e3fde8356..a6b6d3265 100644 --- a/Source/Clients/DotNET/EventSequences/IEventSequence.cs +++ b/Source/Clients/DotNET/EventSequences/IEventSequence.cs @@ -31,6 +31,15 @@ public interface IEventSequence /// True if it has, false if not. Task HasEventsFor(EventSourceId eventSourceId); + /// + /// Get all events after and including the given with optional and of for filtering. + /// + /// The of the first event to get from. + /// The optional . + /// The optional of . + /// A collection of . + Task> GetFromSequenceNumber(EventSequenceNumber sequenceNumber, EventSourceId? eventSourceId = default, IEnumerable? eventTypes = default); + /// /// Get the next sequence number. /// diff --git a/Source/Clients/DotNET/Transactions/IUnitOfWork.cs b/Source/Clients/DotNET/Transactions/IUnitOfWork.cs index 456a95ca8..56865991a 100644 --- a/Source/Clients/DotNET/Transactions/IUnitOfWork.cs +++ b/Source/Clients/DotNET/Transactions/IUnitOfWork.cs @@ -1,6 +1,7 @@ // Copyright (c) Cratis. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Diagnostics.CodeAnalysis; using Cratis.Chronicle.Auditing; using Cratis.Chronicle.Events; using Cratis.Chronicle.Events.Constraints; @@ -13,13 +14,21 @@ namespace Cratis.Chronicle.Transactions; /// public interface IUnitOfWork : IDisposable { + /// + /// Gets the value indicating whether the unit of work is completed. + /// + /// + /// Unit of work being completed is semantically equal to it being disposed. + /// + bool IsCompleted { get; } + /// /// Gets the for the . /// CorrelationId CorrelationId { get; } /// - /// Gets a value indicating whether or not the was successful. + /// Gets a value indicating whether the was successful. /// bool IsSuccess { get; } @@ -67,4 +76,11 @@ public interface IUnitOfWork : IDisposable /// /// The callback to call. void OnCompleted(Action callback); + + /// + /// Try to get the of the last committed event. + /// + /// The outputted of the last committed event. + /// True if events were committed, false if not. + bool TryGetLastCommittedEventSequenceNumber([NotNullWhen(true)]out EventSequenceNumber? eventSequenceNumber); } diff --git a/Source/Clients/DotNET/Transactions/UnitOfWork.cs b/Source/Clients/DotNET/Transactions/UnitOfWork.cs index ae007f1c9..51b04a730 100644 --- a/Source/Clients/DotNET/Transactions/UnitOfWork.cs +++ b/Source/Clients/DotNET/Transactions/UnitOfWork.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System.Collections.Concurrent; +using System.Diagnostics.CodeAnalysis; using Cratis.Chronicle.Auditing; using Cratis.Chronicle.Events; using Cratis.Chronicle.Events.Constraints; @@ -24,11 +25,16 @@ public class UnitOfWork( readonly ConcurrentDictionary> _events = []; readonly ConcurrentBag _constraintViolations = []; readonly ConcurrentBag _appendErrors = []; + + EventSequenceNumber? _lastCommittedEventSequenceNumber; Action _onCompleted = onCompleted; bool _isCommitted; bool _isRolledBack; EventSequenceNumber _currentSequenceNumber = EventSequenceNumber.First; + /// + public bool IsCompleted => _isCommitted || _isRolledBack; + /// public CorrelationId CorrelationId => correlationId; @@ -76,6 +82,7 @@ public async Task Commit() var result = await eventSequence.AppendMany(sorted); result.ConstraintViolations.ForEach(_constraintViolations.Add); result.Errors.ForEach(_appendErrors.Add); + _lastCommittedEventSequenceNumber = result.SequenceNumbers.OrderBy(_ => _.Value).LastOrDefault(); } _onCompleted(this); @@ -96,6 +103,13 @@ public Task Rollback() /// public void OnCompleted(Action callback) => _onCompleted = callback; + /// + public bool TryGetLastCommittedEventSequenceNumber([NotNullWhen(true)] out EventSequenceNumber? eventSequenceNumber) + { + eventSequenceNumber = _lastCommittedEventSequenceNumber; + return eventSequenceNumber is not null; + } + /// public void Dispose() { diff --git a/Source/Clients/Orleans.XUnit/Aggregates/AggregateRootTestSiloExtensions.cs b/Source/Clients/Orleans.XUnit/Aggregates/AggregateRootTestSiloExtensions.cs index be9877388..aaacd882c 100644 --- a/Source/Clients/Orleans.XUnit/Aggregates/AggregateRootTestSiloExtensions.cs +++ b/Source/Clients/Orleans.XUnit/Aggregates/AggregateRootTestSiloExtensions.cs @@ -44,7 +44,7 @@ public static async Task CreateAggregateRoot(thi var aggregateRoot = await silo.CreateGrainAsync(eventSourceId); var unitOfWork = new UnitOfWork(CorrelationId.New(), _ => { }, Defaults.Instance.EventStore); - await aggregateRoot.SetContext(new AggregateRootContext(eventSourceId, eventLog, aggregateRoot, unitOfWork)); + await aggregateRoot.SetContext(new AggregateRootContext(eventSourceId, eventLog, aggregateRoot, unitOfWork, EventSequenceNumber.First)); return aggregateRoot; } @@ -75,7 +75,7 @@ public static async Task CreateAggregateRoot { }, Defaults.Instance.EventStore); - await aggregateRoot.SetContext(new AggregateRootContext(eventSourceId, eventLog, aggregateRoot, unitOfWork)); + await aggregateRoot.SetContext(new AggregateRootContext(eventSourceId, eventLog, aggregateRoot, unitOfWork, EventSequenceNumber.First)); return aggregateRoot; } diff --git a/Source/Clients/Orleans/Aggregates/AggregateRoot.cs b/Source/Clients/Orleans/Aggregates/AggregateRoot.cs index b91d7390d..58abc8bed 100644 --- a/Source/Clients/Orleans/Aggregates/AggregateRoot.cs +++ b/Source/Clients/Orleans/Aggregates/AggregateRoot.cs @@ -34,7 +34,7 @@ public class AggregateRoot : Grain, IAggregateRoot, IAggregateRootContextHolder /// /// Gets a value indicating whether the aggregate root is new. /// - protected bool IsNew => !Context?.HasEventsForRehydration ?? true; + protected bool IsNew => Context?.NextSequenceNumber == EventSequenceNumber.First; /// public async Task SetContext(IAggregateRootContext context) @@ -112,7 +112,7 @@ public class AggregateRoot : Grain, IAggregateRoot, IAggregateRootContex /// /// Gets a value indicating whether the aggregate root is new. /// - protected bool IsNew => !Context?.HasEventsForRehydration ?? true; + protected bool IsNew => Context?.NextSequenceNumber is null || Context?.NextSequenceNumber != EventSequenceNumber.First; /// public async Task SetContext(IAggregateRootContext context) diff --git a/Source/Clients/Orleans/Transactions/GrainCallContextExtensions.cs b/Source/Clients/Orleans/Transactions/GrainCallContextExtensions.cs new file mode 100644 index 000000000..59812c9a9 --- /dev/null +++ b/Source/Clients/Orleans/Transactions/GrainCallContextExtensions.cs @@ -0,0 +1,21 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Orleans.Aggregates; +using Cratis.Reflection; + +namespace Cratis.Chronicle.Orleans.Transactions; + +/// +/// Extension methods for . +/// +internal static class GrainCallContextExtensions +{ + /// + /// Checks if the is for an aggregate root. + /// + /// The to check. + /// True if message is for aggregate root, false if not. + public static bool IsMessageToAggregateRoot(this IGrainCallContext context) => + context.InterfaceMethod.DeclaringType?.HasInterface() ?? false; +} \ No newline at end of file diff --git a/Source/Clients/Orleans/Transactions/UnitOfWorkIncomingCallFilter.cs b/Source/Clients/Orleans/Transactions/UnitOfWorkIncomingCallFilter.cs index c94dbbda8..7a5abad8a 100644 --- a/Source/Clients/Orleans/Transactions/UnitOfWorkIncomingCallFilter.cs +++ b/Source/Clients/Orleans/Transactions/UnitOfWorkIncomingCallFilter.cs @@ -2,11 +2,11 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using Cratis.Chronicle.Aggregates; +using Cratis.Chronicle.Events; using Cratis.Chronicle.EventSequences; using Cratis.Chronicle.Orleans.Aggregates; using Cratis.Chronicle.Transactions; using Cratis.Execution; -using Cratis.Reflection; using IAggregateRoot = Cratis.Chronicle.Orleans.Aggregates.IAggregateRoot; namespace Cratis.Chronicle.Orleans.Transactions; @@ -23,9 +23,8 @@ public class UnitOfWorkIncomingCallFilter( /// public async Task Invoke(IIncomingGrainCallContext context) { - var correlationId = RequestContext.Get(Constants.CorrelationIdKey) as CorrelationId; - if (correlationId is not null && - (context.InterfaceMethod.DeclaringType?.HasInterface() ?? false) && + if (RequestContext.Get(Constants.CorrelationIdKey) is CorrelationId correlationId && + context.IsMessageToAggregateRoot() && unitOfWorkManager.TryGetFor(correlationId, out var unitOfWork)) { var aggregate = (context.TargetContext.GrainInstance as IAggregateRoot)!; @@ -36,11 +35,12 @@ public async Task Invoke(IIncomingGrainCallContext context) aggregate.GetPrimaryKeyString(), eventStore.GetEventSequence(EventSequenceId.Log), aggregate, - unitOfWork); + unitOfWork, + aggregateContextHolder.Context?.NextSequenceNumber ?? EventSequenceNumber.First); await aggregateContextHolder.SetContext(aggregateRootContext); } await context.Invoke(); } -} +} \ No newline at end of file diff --git a/Source/Clients/Orleans/Transactions/UnitOfWorkOutgoingCallFilter.cs b/Source/Clients/Orleans/Transactions/UnitOfWorkOutgoingCallFilter.cs index 044a36ab6..e92102f68 100644 --- a/Source/Clients/Orleans/Transactions/UnitOfWorkOutgoingCallFilter.cs +++ b/Source/Clients/Orleans/Transactions/UnitOfWorkOutgoingCallFilter.cs @@ -17,7 +17,10 @@ public async Task Invoke(IOutgoingGrainCallContext context) { var correlationId = unitOfWorkManager.HasCurrent ? unitOfWorkManager.Current.CorrelationId : CorrelationId.New(); RequestContext.Set(Constants.CorrelationIdKey, correlationId); - + if (context.IsMessageToAggregateRoot() && !unitOfWorkManager.HasCurrent) + { + unitOfWorkManager.Begin(correlationId); + } await context.Invoke(); } } diff --git a/Source/Clients/XUnit/Aggregates/AggregateRootTestFactory.cs b/Source/Clients/XUnit/Aggregates/AggregateRootTestFactory.cs index fb637d5ed..76cd45c63 100644 --- a/Source/Clients/XUnit/Aggregates/AggregateRootTestFactory.cs +++ b/Source/Clients/XUnit/Aggregates/AggregateRootTestFactory.cs @@ -34,7 +34,8 @@ public static TAggregateRoot Create(EventSourceId eventSourceId, eventSourceId, eventSequence, aggregateRoot, - unitOfWork); + unitOfWork, + EventSequenceNumber.First); var mutator = new AggregateRootMutatorForTesting(); var mutation = new AggregateRootMutation(aggregateRootContext, mutator, eventSequence); @@ -64,7 +65,8 @@ public static AggregateRoot Create(EventSourceId eventSourceId, eventSequence, aggregateRoot, - unitOfWork); + unitOfWork, + EventSequenceNumber.First); var mutator = new AggregateRootMutatorForTesting(); var mutation = new AggregateRootMutation(aggregateRootContext, mutator, eventSequence); diff --git a/Source/Clients/XUnit/Events/EventSequenceForTesting.cs b/Source/Clients/XUnit/Events/EventSequenceForTesting.cs index 0f3b74817..c7f701a78 100644 --- a/Source/Clients/XUnit/Events/EventSequenceForTesting.cs +++ b/Source/Clients/XUnit/Events/EventSequenceForTesting.cs @@ -39,6 +39,16 @@ public class EventSequenceForTesting(IEventTypes eventTypes, params EventForEven /// public Task> GetForEventSourceIdAndEventTypes(EventSourceId eventSourceId, IEnumerable eventTypes) => Task.FromResult>(_events.ToImmutableList()); + /// + public Task> GetFromSequenceNumber( + EventSequenceNumber sequenceNumber, + EventSourceId? eventSourceId = default, + IEnumerable? eventTypes = default) => + Task.FromResult>(_events.Where(_ => + _.Metadata.SequenceNumber >= sequenceNumber + && (eventSourceId is null || _.Context.EventSourceId == eventSourceId) + && eventTypes?.Contains(_.Metadata.Type) != false).ToImmutableList()); + /// public Task GetNextSequenceNumber() => Task.FromResult(EventSequenceNumber.First); diff --git a/Source/Kernel/Contracts/EventSequences/GetFromEventSequenceNumberRequest.cs b/Source/Kernel/Contracts/EventSequences/GetFromEventSequenceNumberRequest.cs new file mode 100644 index 000000000..9ce495f8e --- /dev/null +++ b/Source/Kernel/Contracts/EventSequences/GetFromEventSequenceNumberRequest.cs @@ -0,0 +1,44 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Contracts.Events; +using ProtoBuf; + +namespace Cratis.Chronicle.Contracts.EventSequences; + +/// +/// Represents the payload for getting events from a specific event sequence number and optionally for an event source id and specific event types. +/// +[ProtoContract] +public class GetFromEventSequenceNumberRequest : IEventSequenceRequest +{ + /// + [ProtoMember(1)] + public string EventStoreName { get; set; } + + /// + [ProtoMember(2)] + public string Namespace { get; set; } + + /// + [ProtoMember(3)] + public string EventSequenceId { get; set; } + + /// + /// Gets or sets the event sequence number to get events from. + /// + [ProtoMember(4)] + public ulong EventSequenceNumber { get; set; } + + /// + /// Gets or sets the event source identifier. + /// + [ProtoMember(5)] + public string? EventSourceId { get; set; } + + /// + /// Gets or sets the event types to get. + /// + [ProtoMember(6, IsRequired = true)] + public IList EventTypes { get; set; } = []; +} \ No newline at end of file diff --git a/Source/Kernel/Contracts/EventSequences/GetFromEventSequenceNumberResponse.cs b/Source/Kernel/Contracts/EventSequences/GetFromEventSequenceNumberResponse.cs new file mode 100644 index 000000000..98e5199a0 --- /dev/null +++ b/Source/Kernel/Contracts/EventSequences/GetFromEventSequenceNumberResponse.cs @@ -0,0 +1,20 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Contracts.Events; +using ProtoBuf; + +namespace Cratis.Chronicle.Contracts.EventSequences; + +/// +/// Represents the response for getting events from an event sequence number and optionally for an event source id and specific event types. +/// +[ProtoContract] +public class GetFromEventSequenceNumberResponse +{ + /// + /// Gets or sets the events. + /// + [ProtoMember(1, IsRequired = true)] + public IList Events { get; set; } = []; +} \ No newline at end of file diff --git a/Source/Kernel/Contracts/EventSequences/IEventSequences.cs b/Source/Kernel/Contracts/EventSequences/IEventSequences.cs index 05e918805..88191d7a5 100644 --- a/Source/Kernel/Contracts/EventSequences/IEventSequences.cs +++ b/Source/Kernel/Contracts/EventSequences/IEventSequences.cs @@ -47,4 +47,13 @@ public interface IEventSequences /// True if it has, false if not. [Operation] Task HasEventsForEventSourceId(HasEventsForEventSourceIdRequest request, CallContext context = default); + + /// + /// Gets events from a specific event sequence number. + /// + /// . + /// gRPC call context. + /// True if it has, false if not. + [Operation] + Task GetEventsFromEventSequenceNumber(GetFromEventSequenceNumberRequest request, CallContext context = default); } diff --git a/Source/Kernel/Services/EventSequences/EventSequences.cs b/Source/Kernel/Services/EventSequences/EventSequences.cs index 0d43b20d5..7e0051bda 100644 --- a/Source/Kernel/Services/EventSequences/EventSequences.cs +++ b/Source/Kernel/Services/EventSequences/EventSequences.cs @@ -88,6 +88,30 @@ public async Task HasEventsForEventSourceId(H }; } + /// + public async Task GetEventsFromEventSequenceNumber( + GetFromEventSequenceNumberRequest request, + CallContext context = default) + { + var eventSequence = GetEventSequenceStorage(request); + + var cursor = await eventSequence.GetFromSequenceNumber( + request.EventSequenceNumber, + string.IsNullOrWhiteSpace(request.EventSourceId) ? null! : request.EventSourceId, + request.EventTypes.ToChronicle()); + + var events = new List(); + while (await cursor.MoveNext()) + { + var current = cursor.Current; + events.AddRange(current.ToContract()); + } + return new() + { + Events = events + }; + } + IEventSequenceStorage GetEventSequenceStorage(IEventSequenceRequest request) => storage.GetEventStore(request.EventStoreName).GetNamespace(request.Namespace).GetEventSequence(request.EventSequenceId);