Skip to content

Commit

Permalink
Merge pull request #1413 from Cratis/feautre/tcs-and-logging
Browse files Browse the repository at this point in the history
Feautre/tcs and logging
  • Loading branch information
woksin authored Sep 19, 2024
2 parents 6a57a0d + e04889c commit 7f48560
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 30 deletions.
4 changes: 1 addition & 3 deletions Source/Clients/DotNET/ChronicleConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,8 @@ async Task Connect()

_channel = CreateGrpcChannel();
_connectionService = _channel.CreateGrpcService<IConnectionService>();

_lastKeepAlive = DateTimeOffset.UtcNow;

_connectTcs = new TaskCompletionSource();
_connectTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

_keepAliveSubscription = _connectionService.Connect(new()
{
Expand Down
2 changes: 0 additions & 2 deletions Source/Clients/DotNET/Connections/ConnectionLifecycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ public async Task Connected()
public async Task Disconnected()
{
IsConnected = false;

logger.Disconnected();

var tasks = OnDisconnected.GetInvocationList().Select(_ => Task.Run(async () =>
{
try
Expand Down
2 changes: 1 addition & 1 deletion Source/Clients/DotNET/Connections/IConnectionLifecycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface IConnectionLifecycle
event Disconnected OnDisconnected;

/// <summary>
/// Gets whether or not the client is connected.
/// Gets whether, or not the client is connected.
/// </summary>
bool IsConnected { get; }

Expand Down
2 changes: 1 addition & 1 deletion Source/Clients/DotNET/Reactions/Reactors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ async Task ObserverMethod(ISubject<ReactorMessage> messages, ReactorHandler hand

BaseIdentityProvider.SetCurrentIdentity(Identity.System with { OnBehalfOf = context.CausedBy });
var eventType = _eventTypes.GetClrTypeFor(metadata.Type.Id);

var content = await _eventSerializer.Deserialize(eventType, JsonNode.Parse(@event.Content)!.AsObject());

await handler.OnNext(metadata, context, content);
lastSuccessfullyObservedEvent = @event.Metadata.SequenceNumber;
}
catch (Exception ex)
{
_logger.ErrorWhileHandlingEvent(ex, @event.Metadata.Type.Id, handler.Id);
exceptionMessages = ex.GetAllMessages();
exceptionStackTrace = ex.StackTrace ?? string.Empty;
state = ObservationState.Failed;
Expand Down
5 changes: 4 additions & 1 deletion Source/Clients/DotNET/Reactions/ReactorsLogMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@ internal static partial class ReactorsLogMessages
internal static partial void DiscoverAllReactors(this ILogger<Reactors> logger);

[LoggerMessage(LogLevel.Trace, "Event of type {EventTypeId} was received for Reactor {ReactorId}")]
internal static partial void EventReceived(this ILogger<Reactors> logger, EventTypeId eventTypeId, ReactorId ReactorId);
internal static partial void EventReceived(this ILogger<Reactors> logger, EventTypeId eventTypeId, ReactorId reactorId);

[LoggerMessage(LogLevel.Warning, "An error occurred while handling event of type {EventTypeId} was for Reactor {ReactorId}")]
internal static partial void ErrorWhileHandlingEvent(this ILogger<Reactors> logger, Exception ex, EventTypeId eventTypeId, ReactorId reactorId);
}
3 changes: 1 addition & 2 deletions Source/Clients/DotNET/Reducers/ReducerInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ public Task<ReduceResult> Invoke(IEnumerable<EventAndContext> eventsAndContexts,

try
{
if (_methodsByEventType.ContainsKey(eventType))
if (_methodsByEventType.TryGetValue(eventType, out var method))
{
var method = _methodsByEventType[eventType];
var parameters = method.GetParameters();

if (parameters.Length == 3)
Expand Down
16 changes: 7 additions & 9 deletions Source/Clients/DotNET/Reducers/Reducers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void RegisterReducer(IReducerHandler handler)
handler.Id,
handler.EventSequenceId);

var registration = new RegisterReducer()
var registration = new RegisterReducer
{
ConnectionId = eventStore.Connection.Lifecycle.ConnectionId,
EventStoreName = eventStore.Name,
Expand All @@ -134,12 +134,12 @@ void RegisterReducer(IReducerHandler handler)
ReducerId = handler.Id,
EventSequenceId = handler.EventSequenceId,
EventTypes = handler.EventTypes.Select(et => new EventTypeWithKeyExpression { EventType = et.ToContract(), Key = "$eventSourceId" }).ToArray(),
Model = new Contracts.Models.ModelDefinition()
Model = new Contracts.Models.ModelDefinition
{
Name = modelNameResolver.GetNameFor(handler.ReadModelType),
Schema = jsonSchemaGenerator.Generate(handler.ReadModelType).ToJson()
},
Sink = new SinkDefinition()
Sink = new SinkDefinition
{
TypeId = WellKnownSinkTypes.MongoDB
}
Expand Down Expand Up @@ -168,27 +168,25 @@ async Task ObserverMethod(ISubject<ReducerMessage> messages, IReducerHandler han
{
var metadata = @event.Metadata.ToClient();
var context = @event.Context.ToClient();
var eventType = eventTypes.GetClrTypeFor(metadata.Type.Id);
var contentAsExpando = JsonSerializer.Deserialize<ExpandoObject>(@event.Content)!;
return new AppendedEvent(
metadata,
context,
contentAsExpando);
});
}).ToList();

try
{
BaseIdentityProvider.SetCurrentIdentity(Identity.System);
var initialState = operation.InitialState is null ? null : JsonSerializer.Deserialize(operation.InitialState, handler.ReadModelType, jsonSerializerOptions);
var reduceResult = await handler.OnNext(
appendedEvents,
initialState);
var reduceResult = await handler.OnNext(appendedEvents, initialState);

modelState = JsonSerializer.Serialize(reduceResult.ModelState, jsonSerializerOptions);
lastSuccessfullyObservedEvent = appendedEvents.Last().Metadata.SequenceNumber;
lastSuccessfullyObservedEvent = appendedEvents[^1].Metadata.SequenceNumber;
}
catch (Exception ex)
{
logger.ErrorWhileHandlingEvents(ex, appendedEvents[0].Context.SequenceNumber, appendedEvents[^1].Context.SequenceNumber, handler.Id);
exceptionMessages = ex.GetAllMessages();
exceptionStackTrace = ex.StackTrace ?? string.Empty;
state = ObservationState.Failed;
Expand Down
4 changes: 4 additions & 0 deletions Source/Clients/DotNET/Reducers/ReducersLogMessages.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Events;
using Cratis.Chronicle.EventSequences;
using Microsoft.Extensions.Logging;

Expand All @@ -13,4 +14,7 @@ internal static partial class ReducersLogMessages

[LoggerMessage(LogLevel.Trace, "Registering reducer with id '{ReducerId}', for event sequence '{EventSequenceId}'")]
internal static partial void RegisterReducer(this ILogger<Reducers> logger, ReducerId reducerId, EventSequenceId eventSequenceId);

[LoggerMessage(LogLevel.Warning, "An error occurred while handling events with sequence number {StartSequenceNumber} to {EndSequenceNumber} was for Reducer {ReducerId}")]
internal static partial void ErrorWhileHandlingEvents(this ILogger<Reducers> logger, Exception ex, EventSequenceNumber startSequenceNumber, EventSequenceNumber endSequenceNumber, ReducerId reducerId);
}
4 changes: 2 additions & 2 deletions Source/Kernel/Grains/Jobs/Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ public async Task Start(TRequest request)
await WriteStateAsync();

var grainId = this.GetGrainId();
var tcs = new TaskCompletionSource<IImmutableList<JobStepDetails>>();
var tcs = new TaskCompletionSource<IImmutableList<JobStepDetails>>(TaskCreationOptions.RunContinuationsAsynchronously);

PrepareAllSteps(request, tcs);

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
tcs.Task.ContinueWith(
async (Task<IImmutableList<JobStepDetails>> jobStepsTask) =>
async jobStepsTask =>
{
var jobSteps = await jobStepsTask;
if (jobSteps.Count == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public async Task<ObserverSubscriberResult> OnNext(IEnumerable<AppendedEvent> ev
{
throw new MissingStateForReactorSubscriber(_observerId);
}
var tcs = new TaskCompletionSource<ObserverSubscriberResult>();
var tcs = new TaskCompletionSource<ObserverSubscriberResult>(TaskCreationOptions.RunContinuationsAsynchronously);
try
{
reactorMediator.OnNext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public async Task<ObserverSubscriberResult> OnNext(IEnumerable<AppendedEvent> ev
throw new MissingStateForReducerSubscriber(_observerId);
}

var tcs = new TaskCompletionSource<ObserverSubscriberResult>();
var tcs = new TaskCompletionSource<ObserverSubscriberResult>(TaskCreationOptions.RunContinuationsAsynchronously);
try
{
var firstEvent = events.First();
Expand All @@ -79,7 +79,7 @@ public async Task<ObserverSubscriberResult> OnNext(IEnumerable<AppendedEvent> ev

await (_pipeline?.Handle(reducerContext, async (events, initialState) =>
{
var reducerSubscriberResultTCS = new TaskCompletionSource<ReducerSubscriberResult>();
var reducerSubscriberResultTCS = new TaskCompletionSource<ReducerSubscriberResult>(TaskCreationOptions.RunContinuationsAsynchronously);
reducerMediator.OnNext(
_observerId,
Expand Down
6 changes: 3 additions & 3 deletions Source/Kernel/Services/Observation/Reactions/Reactors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class Reactors(
/// <inheritdoc/>
public IObservable<EventsToObserve> Observe(IObservable<ReactorMessage> messages, CallContext context = default)
{
var registrationTcs = new TaskCompletionSource<RegisterReactor>();
var registrationTcs = new TaskCompletionSource<RegisterReactor>(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<ObserverSubscriberResult>? observationResultTcs = null;
TaskCompletionSource<IEnumerable<AppendedEvent>>? eventsTcs;
IReactor clientObserver = null!;
Expand Down Expand Up @@ -83,7 +83,7 @@ public IObservable<EventsToObserve> Observe(IObservable<ReactorMessage> messages
connectionId = registration.ConnectionId;
observerId = registration.ObserverId;
eventsTcs = new TaskCompletionSource<IEnumerable<AppendedEvent>>();
eventsTcs = new TaskCompletionSource<IEnumerable<AppendedEvent>>(TaskCreationOptions.RunContinuationsAsynchronously);
reactorMediator.Subscribe(
registration.ObserverId,
registration.ConnectionId,
Expand All @@ -103,7 +103,7 @@ public IObservable<EventsToObserve> Observe(IObservable<ReactorMessage> messages
break;
}
eventsTcs = new TaskCompletionSource<IEnumerable<AppendedEvent>>();
eventsTcs = new TaskCompletionSource<IEnumerable<AppendedEvent>>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}
catch (OperationCanceledException)
Expand Down
6 changes: 3 additions & 3 deletions Source/Kernel/Services/Observation/Reducers/Reducers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class Reducers(
/// <inheritdoc/>
public IObservable<ReduceOperationMessage> Observe(IObservable<ReducerMessage> messages, CallContext context = default)
{
var registrationTcs = new TaskCompletionSource<RegisterReducer>();
var registrationTcs = new TaskCompletionSource<RegisterReducer>(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<ReducerSubscriberResult>? reducerResultTcs = null;
TaskCompletionSource<ReduceOperation>? reduceOperationTcs;

Expand Down Expand Up @@ -103,7 +103,7 @@ public IObservable<ReduceOperationMessage> Observe(IObservable<ReducerMessage> m
connectionId = registration.ConnectionId;
observerId = registration.Reducer.ReducerId;
reduceOperationTcs = new TaskCompletionSource<ReduceOperation>();
reduceOperationTcs = new TaskCompletionSource<ReduceOperation>(TaskCreationOptions.RunContinuationsAsynchronously);
reducerMediator.Subscribe(
registration.Reducer.ReducerId,
registration.ConnectionId,
Expand All @@ -129,7 +129,7 @@ public IObservable<ReduceOperationMessage> Observe(IObservable<ReducerMessage> m
break;
}
reduceOperationTcs = new TaskCompletionSource<ReduceOperation>();
reduceOperationTcs = new TaskCompletionSource<ReduceOperation>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}
catch (OperationCanceledException)
Expand Down

0 comments on commit 7f48560

Please sign in to comment.