Skip to content

Commit

Permalink
bugfix: stop creating/disposing connection multiplexer (#30)
Browse files Browse the repository at this point in the history
* bugfix: stop creating/disposing connection multiplexer

according to docs it should be a very rare event that the connection multiplexer is disposed

* bugfix(test): test error

* Empty commit to make codacy send new result
  • Loading branch information
the-avid-engineer authored Apr 6, 2022
1 parent 80659fe commit f4f12d7
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/EntityDb.Redis/Sessions/IRedisSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ internal interface IRedisSession : IDisposableResource
{
Task<bool> Insert(RedisKey redisKey, RedisValue redisValue);
Task<RedisValue> Find(RedisKey redisKey);
Task<bool> Delete(IEnumerable<RedisKey> redisKeys);
Task<bool> Delete(RedisKey[] redisKeys);
}
102 changes: 81 additions & 21 deletions src/EntityDb.Redis/Sessions/RedisSession.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
using EntityDb.Common.Disposables;
using EntityDb.Common.Exceptions;
using EntityDb.Common.Snapshots;
using EntityDb.Common.Transactions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace EntityDb.Redis.Sessions;

internal sealed record RedisSession(IConnectionMultiplexer ConnectionMultiplexer, SnapshotSessionOptions SnapshotSessionOptions) : DisposableResourceBaseRecord, IRedisSession
internal sealed record RedisSession
(
ILogger<RedisSession> Logger,
IDatabase Database,
SnapshotSessionOptions SnapshotSessionOptions
) : DisposableResourceBaseRecord, IRedisSession
{
private CommandFlags GetCommandFlags()
{
Expand All @@ -34,29 +43,71 @@ public async Task<bool> Insert(RedisKey redisKey, RedisValue redisValue)
{
AssertNotReadOnly();

ConnectionMultiplexer.GetDatabase();
Logger
.LogInformation
(
"Started Running Redis Insert on `{DatabaseIndex}.{RedisKey}`",
Database.Database,
redisKey.ToString()
);

var redisTransaction = Database.CreateTransaction();

var redisTransaction = ConnectionMultiplexer.GetDatabase().CreateTransaction();

var insertedTask = redisTransaction.StringSetAsync(redisKey, redisValue);

await redisTransaction.ExecuteAsync(GetCommandFlags());

return await insertedTask;
var inserted = await insertedTask;

Logger
.LogInformation
(
"Finished Running Redis Insert on `{DatabaseIndex}.{RedisKey}`\n\nInserted: {Inserted}",
Database.Database,
redisKey.ToString(),
inserted
);

return inserted;
}

public async Task<RedisValue> Find(RedisKey redisKey)
{
var redisDatabase = ConnectionMultiplexer.GetDatabase();

return await redisDatabase.StringGetAsync(redisKey, GetCommandFlags());
Logger
.LogInformation
(
"Started Running Redis Query on `{DatabaseIndex}.{RedisKey}`",
Database.Database,
redisKey.ToString()
);

var redisValue = await Database.StringGetAsync(redisKey, GetCommandFlags());

Logger
.LogInformation
(
"Finished Running Redis Query on `{DatabaseIndex}.{RedisKey}`\n\nHas Value: {HasValue}",
Database.Database,
redisKey.ToString(),
redisValue.HasValue
);

return redisValue;
}

public async Task<bool> Delete(IEnumerable<RedisKey> redisKeys)
public async Task<bool> Delete(RedisKey[] redisKeys)
{
AssertNotReadOnly();

var redisTransaction = ConnectionMultiplexer.GetDatabase().CreateTransaction();
Logger
.LogInformation
(
"Started Running Redis Delete on `{DatabaseIndex}` for {NumberOfKeys} Key(s)",
Database.Database,
redisKeys.Length
);

var redisTransaction = Database.CreateTransaction();

var deleteSnapshotTasks = redisKeys
.Select(key => redisTransaction.KeyDeleteAsync(key))
Expand All @@ -65,19 +116,28 @@ public async Task<bool> Delete(IEnumerable<RedisKey> redisKeys)
await redisTransaction.ExecuteAsync(GetCommandFlags());

await Task.WhenAll(deleteSnapshotTasks);

return deleteSnapshotTasks.All(task => task.Result);
}

public override void Dispose()
{

var allDeleted = deleteSnapshotTasks.All(task => task.Result);

Logger
.LogInformation
(
"Finished Running Redis Delete on `{DatabaseIndex}` for {NumberOfKeys} Key(s)\n\nAll Deleted: {AllDeleted}",
Database.Database,
redisKeys.Length,
allDeleted
);

return allDeleted;
}

public override ValueTask DisposeAsync()

public static IRedisSession Create
(
IServiceProvider serviceProvider,
IDatabase database,
SnapshotSessionOptions snapshotSessionOptions
)
{
ConnectionMultiplexer.Dispose();

return ValueTask.CompletedTask;
return ActivatorUtilities.CreateInstance<RedisSession>(serviceProvider, database, snapshotSessionOptions);
}
}
4 changes: 2 additions & 2 deletions src/EntityDb.Redis/Snapshots/RedisSnapshotRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ IRedisSession redisSession

private RedisKey GetSnapshotKey(Id snapshotId)
{
return $"{_keyNamespace}#{snapshotId}";
return $"{_keyNamespace}#{snapshotId.Value}";
}

public async Task<bool> PutSnapshot(Id snapshotId, TSnapshot snapshot, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -61,7 +61,7 @@ public async Task<bool> PutSnapshot(Id snapshotId, TSnapshot snapshot, Cancellat

public async Task<bool> DeleteSnapshots(Id[] snapshotIds, CancellationToken cancellationToken = default)
{
var snapshotKeys = snapshotIds.Select(GetSnapshotKey);
var snapshotKeys = snapshotIds.Select(GetSnapshotKey).ToArray();

return await _redisSession.Delete(snapshotKeys).WaitAsync(cancellationToken);
}
Expand Down
34 changes: 31 additions & 3 deletions src/EntityDb.Redis/Snapshots/RedisSnapshotRepositoryFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ internal class RedisSnapshotRepositoryFactory<TSnapshot> : DisposableResourceBas
private readonly IEnvelopeService<JsonElement> _envelopeService;
private readonly string _connectionString;
private readonly string _keyNamespace;
private readonly SemaphoreSlim _connectionSemaphore = new(1);

private IConnectionMultiplexer? _connectionMultiplexer;

public RedisSnapshotRepositoryFactory
(
Expand All @@ -39,13 +42,31 @@ string keyNamespace
_keyNamespace = keyNamespace;
}

private async Task<IRedisSession> CreateSession(SnapshotSessionOptions snapshotSessionOptions, CancellationToken cancellationToken)
private async Task<IConnectionMultiplexer> OpenConnectionMultiplexer(CancellationToken cancellationToken)
{
await _connectionSemaphore.WaitAsync(cancellationToken);

if (_connectionMultiplexer != null)
{
_connectionSemaphore.Release();

return _connectionMultiplexer;
}

var configurationOptions = ConfigurationOptions.Parse(_connectionString);

_connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(configurationOptions).WaitAsync(cancellationToken);

var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(configurationOptions).WaitAsync(cancellationToken);
_connectionSemaphore.Release();

return new RedisSession(connectionMultiplexer, snapshotSessionOptions);
return _connectionMultiplexer;
}

private async Task<IRedisSession> CreateSession(SnapshotSessionOptions snapshotSessionOptions, CancellationToken cancellationToken)
{
var connectionMultiplexer = await OpenConnectionMultiplexer(cancellationToken);

return RedisSession.Create(_serviceProvider, connectionMultiplexer.GetDatabase(), snapshotSessionOptions);
}

public async Task<ISnapshotRepository<TSnapshot>> CreateRepository(string snapshotSessionOptionsName, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -74,4 +95,11 @@ public static RedisSnapshotRepositoryFactory<TSnapshot> Create(IServiceProvider
keyNamespace
);
}

public override ValueTask DisposeAsync()
{
_connectionMultiplexer?.Dispose();

return ValueTask.CompletedTask;
}
}
2 changes: 1 addition & 1 deletion test/EntityDb.Redis.Tests/Sessions/RedisSessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public async Task WhenExecutingWriteMethods_ThenThrow()
{
// ARRANGE

var readOnlyRedisSession = new RedisSession(default!, new SnapshotSessionOptions
var readOnlyRedisSession = new RedisSession(default!, default!, new SnapshotSessionOptions
{
ReadOnly = true
});
Expand Down

0 comments on commit f4f12d7

Please sign in to comment.