Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
the-avid-engineer committed Jan 18, 2024
1 parent bd1371d commit 72b3b03
Show file tree
Hide file tree
Showing 25 changed files with 226 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface IMultipleEntityRepository<TEntity> : IDisposableResource
/// Call this method to load an entity that already exists before calling
/// <see cref="Append" />.
/// </remarks>
Task Load(StatePointer entityPointer, CancellationToken cancellationToken = default);
Task<bool> TryLoad(StatePointer entityPointer, CancellationToken cancellationToken = default);

/// <summary>
/// Returns the state of a <typeparamref name="TEntity" /> for a given <see cref="Id" />.
Expand Down
2 changes: 1 addition & 1 deletion src/EntityDb.Abstractions/Projections/IProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ IAsyncEnumerable<Source> EnumerateSources(IServiceProvider serviceProvider, Stat
/// </summary>
/// <param name="source">A source</param>
/// <returns>The state ids for the projections.</returns>
static abstract IEnumerable<Id> EnumerateRelevantStateIds(Source source);
static abstract IEnumerable<Id> EnumerateProjectionIds(Source source);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ public interface IProjectionRepository<TProjection> : IDisposableResource
/// <param name="projectionPointer">The state pointer to the projection.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>The state of a <typeparamref name="TProjection" /> for <paramref name="projectionPointer" />.</returns>
Task<TProjection> Get(StatePointer projectionPointer, CancellationToken cancellationToken = default);
Task<bool> TryLoad(StatePointer projectionPointer, CancellationToken cancellationToken = default);

TProjection Get(Id projectionId);

Check warning on line 25 in src/EntityDb.Abstractions/Projections/IProjectionRepository.cs

View workflow job for this annotation

GitHub Actions / Publish to Beta

Missing XML comment for publicly visible type or member 'IProjectionRepository<TProjection>.Get(Id)'

Check warning on line 25 in src/EntityDb.Abstractions/Projections/IProjectionRepository.cs

View workflow job for this annotation

GitHub Actions / Publish to Beta

Missing XML comment for publicly visible type or member 'IProjectionRepository<TProjection>.Get(Id)'
}
5 changes: 2 additions & 3 deletions src/EntityDb.Abstractions/States/IState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ public interface IState<TState>
/// You would use this if you intent to fetch a state at multiple versions and don't want to hit
/// the source database when it can be avoided.
/// </remarks>
bool ShouldRecord();
bool ShouldPersist() => false;

/// <summary>
/// Indicates if this state instance should be recorded as the latest state.
/// </summary>
/// <param name="previousLatestState">The previous instance of the latest state.</param>
/// <returns><c>true</c> if this state instance should be recorded as the latest state, or else <c>false</c>.</returns>
bool ShouldRecordAsLatest(TState? previousLatestState);
bool ShouldPersistAsLatest() => false;
}
28 changes: 28 additions & 0 deletions src/EntityDb.Abstractions/States/StatePointer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,34 @@ public override string ToString()
{
return $"{Id}@{StateVersion}";
}

/// <summary>
/// Checks if the state pointer found satisfies the state pointer requested.
/// </summary>
/// <param name="candidateStatePointer">A candidate state pointer.</param>
public bool IsSatisfiedBy(StatePointer candidateStatePointer)
{
if (Id != candidateStatePointer.Id)
{
return false;
}

if (StateVersion == StateVersion.Zero)
{
return candidateStatePointer.StateVersion != StateVersion.Zero;
}

return StateVersion == candidateStatePointer.StateVersion;
}

/// <summary>
/// Returns the next state pointer.
/// </summary>
/// <returns>The next state pointer.</returns>
public StatePointer Previous()
{
return Id + StateVersion.Previous();
}

/// <summary>
/// Returns the next state pointer.
Expand Down
9 changes: 9 additions & 0 deletions src/EntityDb.Abstractions/States/StateVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ public readonly record struct StateVersion(ulong Value)
/// </summary>
public static readonly StateVersion One = new(1);

/// <summary>
/// Returns the next version.
/// </summary>
/// <returns>The next version.</returns>
public StateVersion Previous()
{
return new StateVersion(Value - 1);
}

/// <summary>
/// Returns the next version.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/EntityDb.Common/Entities/EntityRepositoryFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public async Task<ISingleEntityRepository<TEntity>> CreateSingleForExisting
var multipleEntityRepository = await CreateMultiple(agentSignatureOptionsName, sourceSessionOptionsName,
stateSessionOptionsName, cancellationToken);

await multipleEntityRepository.Load(entityPointer, cancellationToken);
await multipleEntityRepository.TryLoad(entityPointer, cancellationToken);

return new SingleEntityRepository<TEntity>(multipleEntityRepository, entityPointer);
}
Expand Down
55 changes: 37 additions & 18 deletions src/EntityDb.Common/Entities/MultipleEntityRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,41 +48,60 @@ public void Create(Id entityId)
_knownEntities.Add(entityId, entity);
}

public async Task Load(StatePointer entityPointer, CancellationToken cancellationToken = default)
public async Task<bool> TryLoad(StatePointer entityPointer, CancellationToken cancellationToken = default)
{
if (_knownEntities.ContainsKey(entityPointer.Id))
var entityId = entityPointer.Id;

if (_knownEntities.TryGetValue(entityId, out var entity))
{
throw new ExistingEntityException();
var knownEntityPointer = entity.GetPointer();

if (entityPointer.IsSatisfiedBy(knownEntityPointer))
{
return true;
}

if (entityPointer.StateVersion.Value < knownEntityPointer.StateVersion.Value)
{
return false;
}
}
else if (StateRepository is not null)
{
entity = await StateRepository.Get(entityPointer, cancellationToken) ??
TEntity.Construct(entityId);
}
else
{
entity = TEntity.Construct(entityId);
}

var state = StateRepository is not null
? await StateRepository.Get(entityPointer, cancellationToken) ??
TEntity.Construct(entityPointer.Id)
: TEntity.Construct(entityPointer.Id);

var statePointer = state.GetPointer();

var query = new GetDeltasDataQuery(entityPointer, statePointer.StateVersion);
var query = new GetDeltasDataQuery(entityPointer, entity.GetPointer().StateVersion);

var entity = await SourceRepository
entity = await SourceRepository
.EnumerateDeltas(query, cancellationToken)
.AggregateAsync
(
state,
(current, delta) => current.Reduce(delta),
entity,
(previousEntity, delta) => previousEntity.Reduce(delta),
cancellationToken
);

StateDoesNotExistException.ThrowIfNotAcceptable(entityPointer, entity.GetPointer());
if (!entityPointer.IsSatisfiedBy(entity.GetPointer()))
{
return false;
}

_knownEntities.Add(entityPointer.Id, entity);
_knownEntities.Add(entityId, entity);

return true;
}

public TEntity Get(Id entityId)
{
if (!_knownEntities.TryGetValue(entityId, out var entity))
{
throw new UnknownEntityIdException();
throw new UnknownEntityException();
}

return entity;
Expand All @@ -92,7 +111,7 @@ public void Append(Id entityId, object delta)
{
if (!_knownEntities.TryGetValue(entityId, out var entity))
{
throw new UnknownEntityIdException();
throw new UnknownEntityException();
}

entity = entity.Reduce(delta);
Expand Down
2 changes: 1 addition & 1 deletion src/EntityDb.Common/Exceptions/ExistingEntityException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace EntityDb.Common.Exceptions;

/// <summary>
/// The exception that is thrown when <see cref="IMultipleEntityRepository{TEntity}.Create" /> or
/// <see cref="IMultipleEntityRepository{TEntity}.Load" /> is called for an entity that already
/// <see cref="IMultipleEntityRepository{TEntity}.TryLoad" /> is called for an entity that already
/// exists in the repository.
/// </summary>
public sealed class ExistingEntityException : Exception;
40 changes: 0 additions & 40 deletions src/EntityDb.Common/Exceptions/StateDoesNotExistException.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ namespace EntityDb.Common.Exceptions;
/// <see cref="IMultipleEntityRepository{TEntity}.Append" /> is called for an entity that
/// is not loaded into the repository.
/// </summary>
public sealed class UnknownEntityIdException : Exception;
public sealed class UnknownEntityException : Exception;
3 changes: 3 additions & 0 deletions src/EntityDb.Common/Exceptions/UnknownProjectionException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace EntityDb.Common.Exceptions;

public sealed class UnknownProjectionException : Exception;
51 changes: 45 additions & 6 deletions src/EntityDb.Common/Projections/ProjectionRepository.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using EntityDb.Abstractions;
using EntityDb.Abstractions.Projections;
using EntityDb.Abstractions.States;
using EntityDb.Common.Disposables;
using EntityDb.Common.Exceptions;
using Microsoft.Extensions.DependencyInjection;
using System.Diagnostics.CodeAnalysis;

namespace EntityDb.Common.Projections;

Expand All @@ -11,6 +13,7 @@ internal sealed class ProjectionRepository<TProjection> : DisposableResourceBase
where TProjection : IProjection<TProjection>
{
private readonly IServiceProvider _serviceProvider;
private readonly Dictionary<Id, TProjection> _knownProjections = new();

public ProjectionRepository
(
Expand All @@ -24,12 +27,33 @@ public ProjectionRepository

public IStateRepository<TProjection>? StateRepository { get; }

public async Task<TProjection> Get(StatePointer projectionPointer, CancellationToken cancellationToken = default)
public async Task<bool> TryLoad(StatePointer projectionPointer, CancellationToken cancellationToken = default)
{
var projection = StateRepository is not null
? await StateRepository.Get(projectionPointer, cancellationToken) ??
TProjection.Construct(projectionPointer.Id)
: TProjection.Construct(projectionPointer.Id);
var projectionId = projectionPointer.Id;

if (_knownProjections.TryGetValue(projectionId, out var projection))
{
var knownProjectionPointer = projection.GetPointer();

if (projectionPointer.IsSatisfiedBy(knownProjectionPointer))
{
return true;
}

if (projectionPointer.StateVersion.Value < knownProjectionPointer.StateVersion.Value)
{
return false;
}
}
else if (StateRepository is not null)
{
projection = await StateRepository.Get(projectionPointer, cancellationToken) ??
TProjection.Construct(projectionId);
}
else
{
projection = TProjection.Construct(projectionId);
}

var sources = projection.EnumerateSources(_serviceProvider, projectionPointer, cancellationToken);

Expand All @@ -38,7 +62,22 @@ public async Task<TProjection> Get(StatePointer projectionPointer, CancellationT
projection.Mutate(source);
}

StateDoesNotExistException.ThrowIfNotAcceptable(projectionPointer, projection.GetPointer());
if (!projectionPointer.IsSatisfiedBy(projection.GetPointer()))
{
return false;
}

_knownProjections.Add(projectionId, projection);

return true;
}

public TProjection Get(Id projectionId)
{
if (!_knownProjections.TryGetValue(projectionId, out var projection))
{
throw new UnknownProjectionException();
}

return projection;
}
Expand Down
Loading

0 comments on commit 72b3b03

Please sign in to comment.