Skip to content

Commit

Permalink
URGENT Fix error in batch/transaction handling (#2177)
Browse files Browse the repository at this point in the history
* PrepareScript should work for parameterless scripts; fix #2164

* update link in release notes

* tyop

* Batch/Transaction need to override new ExecuteAsync API; fix #2167 fix #2176

* Maintain correctness on NRTs and asyncState

Previously the F+F case was getting a null default rather than the pass default value - this corrects that as well. It is not DRY across classes, but let's get this fix in ASAP then address that.

* test both batch and transaction

* bump

Co-authored-by: Nick Craver <nrcraver@gmail.com>
  • Loading branch information
mgravell and NickCraver authored Jun 28, 2022
1 parent bc0c5a2 commit 0ebe530
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Pending

- URGENT Fix: [#2167](https://github.com/StackExchange/StackExchange.Redis/issues/2167), [#2176](https://github.com/StackExchange/StackExchange.Redis/issues/2176): fix error in batch/transaction handling that can result in out-of-order instructions ([#2177 by MarcGravell](https://github.com/StackExchange/StackExchange.Redis/pull/2177))
- Fix: [#2164](https://github.com/StackExchange/StackExchange.Redis/issues/2164): fix `LuaScript.Prepare` for scripts that don't have parameters ([#2166 by MarcGravell](https://github.com/StackExchange/StackExchange.Redis/pull/2166))

## 2.6.45
Expand Down
23 changes: 23 additions & 0 deletions src/StackExchange.Redis/RedisBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,29 @@ public void Execute()
}
}

internal override Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, T defaultValue, ServerEndPoint? server = null)
{
if (message == null) return CompletedTask<T>.FromDefault(defaultValue, asyncState);
multiplexer.CheckMessage(message);

// prepare the inner command as a task
Task<T> task;
if (message.IsFireAndForget)
{
task = CompletedTask<T>.FromDefault(defaultValue, null); // F+F explicitly does not get async-state
}
else
{
var source = TaskResultBox<T>.Create(out var tcs, asyncState);
task = tcs.Task;
message.SetSource(source, processor);
}

// store it
(pending ??= new List<Message>()).Add(message);
return task!;
}

internal override Task<T?> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null) where T : default
{
if (message == null) return CompletedTask<T>.Default(asyncState);
Expand Down
32 changes: 31 additions & 1 deletion src/StackExchange.Redis/RedisTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,30 @@ public Task<bool> ExecuteAsync(CommandFlags flags)
return base.ExecuteAsync(msg, proc); // need base to avoid our local wrapping override
}

internal override Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, T defaultValue, ServerEndPoint? server = null)
{
if (message == null) return CompletedTask<T>.FromDefault(defaultValue, asyncState);
multiplexer.CheckMessage(message);

multiplexer.Trace("Wrapping " + message.Command, "Transaction");
// prepare the inner command as a task
Task<T> task;
if (message.IsFireAndForget)
{
task = CompletedTask<T>.FromDefault(defaultValue, null); // F+F explicitly does not get async-state
}
else
{
var source = TaskResultBox<T>.Create(out var tcs, asyncState);
message.SetSource(source, processor);
task = tcs.Task;
}

QueueMessage(message);

return task;
}

internal override Task<T?> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null) where T : default
{
if (message == null) return CompletedTask<T>.Default(asyncState);
Expand All @@ -75,6 +99,13 @@ public Task<bool> ExecuteAsync(CommandFlags flags)
task = tcs.Task;
}

QueueMessage(message);

return task;
}

private void QueueMessage(Message message)
{
// prepare an outer-command that decorates that, but expects QUEUED
var queued = new QueuedMessage(message);
var wasQueued = SimpleResultBox<bool>.Create();
Expand Down Expand Up @@ -102,7 +133,6 @@ public Task<bool> ExecuteAsync(CommandFlags flags)
break;
}
}
return task;
}

internal override T? ExecuteSync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null, T? defaultValue = default) where T : default
Expand Down
83 changes: 83 additions & 0 deletions tests/StackExchange.Redis.Tests/Issues/Issue2176.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace StackExchange.Redis.Tests.Issues
{
public class Issue2176 : TestBase
{
public Issue2176(ITestOutputHelper output) : base(output) { }

[Fact]
public void Execute_Batch()
{
using var conn = Create();
var db = conn.GetDatabase();

var me = Me();
var key = me + ":1";
var key2 = me + ":2";
var keyIntersect = me + ":result";

db.KeyDelete(key);
db.KeyDelete(key2);
db.KeyDelete(keyIntersect);
db.SortedSetAdd(key, "a", 1345);

var tasks = new List<Task>();
var batch = db.CreateBatch();
tasks.Add(batch.SortedSetAddAsync(key2, "a", 4567));
tasks.Add(batch.SortedSetCombineAndStoreAsync(SetOperation.Intersect,
keyIntersect, new RedisKey[] { key, key2 }));
var rangeByRankTask = batch.SortedSetRangeByRankAsync(keyIntersect);
tasks.Add(rangeByRankTask);
batch.Execute();

Task.WhenAll(tasks.ToArray());

var rangeByRankSortedSetValues = rangeByRankTask.Result;

int size = rangeByRankSortedSetValues.Length;
Assert.Equal(1, size);
string firstRedisValue = rangeByRankSortedSetValues.FirstOrDefault().ToString();
Assert.Equal("a", firstRedisValue);
}

[Fact]
public void Execute_Transaction()
{
using var conn = Create();
var db = conn.GetDatabase();

var me = Me();
var key = me + ":1";
var key2 = me + ":2";
var keyIntersect = me + ":result";

db.KeyDelete(key);
db.KeyDelete(key2);
db.KeyDelete(keyIntersect);
db.SortedSetAdd(key, "a", 1345);

var tasks = new List<Task>();
var batch = db.CreateTransaction();
tasks.Add(batch.SortedSetAddAsync(key2, "a", 4567));
tasks.Add(batch.SortedSetCombineAndStoreAsync(SetOperation.Intersect,
keyIntersect, new RedisKey[] { key, key2 }));
var rangeByRankTask = batch.SortedSetRangeByRankAsync(keyIntersect);
tasks.Add(rangeByRankTask);
batch.Execute();

Task.WhenAll(tasks.ToArray());

var rangeByRankSortedSetValues = rangeByRankTask.Result;

int size = rangeByRankSortedSetValues.Length;
Assert.Equal(1, size);
string firstRedisValue = rangeByRankSortedSetValues.FirstOrDefault().ToString();
Assert.Equal("a", firstRedisValue);
}
}
}

0 comments on commit 0ebe530

Please sign in to comment.