diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 6eee8c388..05f9c54c3 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -27,6 +27,7 @@ - Adds: Support for `ZMPOP` with `.SortedSetPop()`/`.SortedSetPopAsync()` ([#2094 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2094)) - Adds: Support for `XAUTOCLAIM` with `.StreamAutoClaim()`/.`StreamAutoClaimAsync()` and `.StreamAutoClaimIdsOnly()`/.`StreamAutoClaimIdsOnlyAsync()` ([#2095 by ttingen](https://github.com/StackExchange/StackExchange.Redis/pull/2095)) - Fix [#2071](https://github.com/StackExchange/StackExchange.Redis/issues/2071): Add `.StringSet()`/`.StringSetAsync()` overloads for source compat broken for 1 case in 2.5.61 ([#2098 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2098)) +- Fix [#2086](https://github.com/StackExchange/StackExchange.Redis/issues/2086): Correct HashSlot calculations for `XREAD` and `XREADGROUP` commands ([#2093 by nielsderdaele](https://github.com/StackExchange/StackExchange.Redis/pull/2093)) - Adds: Support for `LCS` with `.StringLongestCommonSubsequence()`/`.StringLongestCommonSubsequence()`, `.StringLongestCommonSubsequenceLength()`/`.StringLongestCommonSubsequenceLengthAsync()`, and `.StringLongestCommonSubsequenceWithMatches()`/`.StringLongestCommonSubsequenceWithMatchesAsync()` ([#2104 by Avital-Fine](https://github.com/StackExchange/StackExchange.Redis/pull/2104)) - Adds: Support for `OBJECT FREQ` with `.KeyFrequency()`/`.KeyFrequencyAsync()` ([#2105 by Avital-Fine](https://github.com/StackExchange/StackExchange.Redis/pull/2105)) - Performance: Avoids allocations when computing cluster hash slots or testing key equality ([#2110 by Marc Gravell](https://github.com/StackExchange/StackExchange.Redis/pull/2110)) diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index a785b502c..38338390c 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -3416,109 +3416,161 @@ private static RedisValue GetLexRange(RedisValue value, Exclude exclude, bool is return result; } - private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags) + private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck, CommandFlags flags) => + new MultiStreamReadGroupCommandMessage(Database, + flags, + streamPositions, + groupName, + consumerName, + countPerStream, + noAck); + + private sealed class MultiStreamReadGroupCommandMessage : Message // XREADGROUP with multiple stream. Example: XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2 { - // Example: XREADGROUP GROUP groupName consumerName COUNT countPerStream STREAMS stream1 stream2 id1 id2 - if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions)); - if (streamPositions.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPositions), "streamOffsetPairs must contain at least one item."); + private readonly StreamPosition[] streamPositions; + private readonly RedisValue groupName; + private readonly RedisValue consumerName; + private readonly int? countPerStream; + private readonly bool noAck; + private readonly int argCount; - if (countPerStream.HasValue && countPerStream <= 0) + public MultiStreamReadGroupCommandMessage(int db, CommandFlags flags, StreamPosition[] streamPositions, RedisValue groupName, RedisValue consumerName, int? countPerStream, bool noAck) + : base(db, flags, RedisCommand.XREADGROUP) { - throw new ArgumentOutOfRangeException(nameof(countPerStream), "countPerStream must be greater than 0."); - } - - var values = new RedisValue[ - 4 // Room for GROUP groupName consumerName & STREAMS - + (streamPositions.Length * 2) // Enough room for the stream keys and associated IDs. - + (countPerStream.HasValue ? 2 : 0) // Room for "COUNT num" or 0 if countPerStream is null. - + (noAck ? 1 : 0)]; // Allow for the NOACK subcommand. + if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions)); + if (streamPositions.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPositions), "streamOffsetPairs must contain at least one item."); + for (int i = 0; i < streamPositions.Length; i++) + { + streamPositions[i].Key.AssertNotNull(); + } - var offset = 0; + if (countPerStream.HasValue && countPerStream <= 0) + { + throw new ArgumentOutOfRangeException(nameof(countPerStream), "countPerStream must be greater than 0."); + } - values[offset++] = StreamConstants.Group; - values[offset++] = groupName; - values[offset++] = consumerName; + groupName.AssertNotNull(); + consumerName.AssertNotNull(); + + this.streamPositions = streamPositions; + this.groupName = groupName; + this.consumerName = consumerName; + this.countPerStream = countPerStream; + this.noAck = noAck; - if (countPerStream.HasValue) - { - values[offset++] = StreamConstants.Count; - values[offset++] = countPerStream; + argCount = 4 // Room for GROUP groupName consumerName & STREAMS + + (streamPositions.Length * 2) // Enough room for the stream keys and associated IDs. + + (countPerStream.HasValue ? 2 : 0) // Room for "COUNT num" or 0 if countPerStream is null. + + (noAck ? 1 : 0); // Allow for the NOACK subcommand. + } - if (noAck) + public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) { - values[offset++] = StreamConstants.NoAck; + int slot = ServerSelectionStrategy.NoSlot; + for (int i = 0; i < streamPositions.Length; i++) + { + slot = serverSelectionStrategy.CombineSlot(slot, streamPositions[i].Key); + } + return slot; } - values[offset++] = StreamConstants.Streams; + protected override void WriteImpl(PhysicalConnection physical) + { + physical.WriteHeader(Command, argCount); + physical.WriteBulkString(StreamConstants.Group); + physical.WriteBulkString(groupName); + physical.WriteBulkString(consumerName); - var pairCount = streamPositions.Length; + if (countPerStream.HasValue) + { + physical.WriteBulkString(StreamConstants.Count); + physical.WriteBulkString(countPerStream.Value); + } - for (var i = 0; i < pairCount; i++) - { - values[offset] = streamPositions[i].Key.AsRedisValue(); - values[offset + pairCount] = StreamPosition.Resolve(streamPositions[i].Position, RedisCommand.XREADGROUP); + if (noAck) + { + physical.WriteBulkString(StreamConstants.NoAck); + } - offset++; - } + physical.WriteBulkString(StreamConstants.Streams); + for (int i = 0; i < streamPositions.Length; i++) + { + physical.Write(streamPositions[i].Key); + } + for (int i = 0; i < streamPositions.Length; i++) + { + physical.WriteBulkString(StreamPosition.Resolve(streamPositions[i].Position, RedisCommand.XREADGROUP)); + } + } - return Message.Create(Database, flags, RedisCommand.XREADGROUP, values); + public override int ArgCount => argCount; } - private Message GetMultiStreamReadMessage(StreamPosition[] streamPositions, int? countPerStream, CommandFlags flags) - { - // Example: XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 + private Message GetMultiStreamReadMessage(StreamPosition[] streamPositions, int? countPerStream, CommandFlags flags) => + new MultiStreamReadCommandMessage(Database, flags, streamPositions, countPerStream); - if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions)); - if (streamPositions.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPositions), "streamOffsetPairs must contain at least one item."); + private sealed class MultiStreamReadCommandMessage : Message // XREAD with multiple stream. Example: XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 + { + private readonly StreamPosition[] streamPositions; + private readonly int? countPerStream; + private readonly int argCount; - if (countPerStream.HasValue && countPerStream <= 0) + public MultiStreamReadCommandMessage(int db, CommandFlags flags, StreamPosition[] streamPositions, int? countPerStream) + : base(db, flags, RedisCommand.XREAD) { - throw new ArgumentOutOfRangeException(nameof(countPerStream), "countPerStream must be greater than 0."); - } + if (streamPositions == null) throw new ArgumentNullException(nameof(streamPositions)); + if (streamPositions.Length == 0) throw new ArgumentOutOfRangeException(nameof(streamPositions), "streamOffsetPairs must contain at least one item."); + for (int i = 0; i < streamPositions.Length; i++) + { + streamPositions[i].Key.AssertNotNull(); + } - var values = new RedisValue[ - 1 // Streams keyword. - + (streamPositions.Length * 2) // Room for the stream names and the ID after which to begin reading. - + (countPerStream.HasValue ? 2 : 0)]; // Room for "COUNT num" or 0 if countPerStream is null. + if (countPerStream.HasValue && countPerStream <= 0) + { + throw new ArgumentOutOfRangeException(nameof(countPerStream), "countPerStream must be greater than 0."); + } - var offset = 0; + this.streamPositions = streamPositions; + this.countPerStream = countPerStream; - if (countPerStream.HasValue) - { - values[offset++] = StreamConstants.Count; - values[offset++] = countPerStream; + argCount = 1 // Streams keyword. + + (countPerStream.HasValue ? 2 : 0) // Room for "COUNT num" or 0 if countPerStream is null. + + (streamPositions.Length * 2); // Room for the stream names and the ID after which to begin reading. } - values[offset++] = StreamConstants.Streams; - - // Write the stream names and the message IDs from which to read for the associated stream. Each pair - // will be separated by an offset of the index of the stream name plus the pair count. - - /* - * [0] = COUNT - * [1] = 2 - * [3] = STREAMS - * [4] = stream1 - * [5] = stream2 - * [6] = stream3 - * [7] = id1 - * [8] = id2 - * [9] = id3 - * - * */ - - var pairCount = streamPositions.Length; + public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) + { + int slot = ServerSelectionStrategy.NoSlot; + for (int i = 0; i < streamPositions.Length; i++) + { + slot = serverSelectionStrategy.CombineSlot(slot, streamPositions[i].Key); + } + return slot; + } - for (var i = 0; i < pairCount; i++) + protected override void WriteImpl(PhysicalConnection physical) { - values[offset] = streamPositions[i].Key.AsRedisValue(); - values[offset + pairCount] = StreamPosition.Resolve(streamPositions[i].Position, RedisCommand.XREAD); + physical.WriteHeader(Command, argCount); - offset++; + if (countPerStream.HasValue) + { + physical.WriteBulkString(StreamConstants.Count); + physical.WriteBulkString(countPerStream.Value); + } + + physical.WriteBulkString(StreamConstants.Streams); + for (int i = 0; i < streamPositions.Length; i++) + { + physical.Write(streamPositions[i].Key); + } + for (int i = 0; i < streamPositions.Length; i++) + { + physical.WriteBulkString(StreamPosition.Resolve(streamPositions[i].Position, RedisCommand.XREADGROUP)); + } } - return Message.Create(Database, flags, RedisCommand.XREAD, values); + public override int ArgCount => argCount; } private static RedisValue GetRange(double value, Exclude exclude, bool isStart) @@ -4107,71 +4159,106 @@ private Message GetStreamRangeMessage(RedisKey key, RedisValue? minId, RedisValu values); } - private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, bool noAck, CommandFlags flags) + private Message GetStreamReadGroupMessage(RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, bool noAck, CommandFlags flags) => + new SingleStreamReadGroupCommandMessage(Database, flags, key, groupName, consumerName, afterId, count, noAck); + + private sealed class SingleStreamReadGroupCommandMessage : Message.CommandKeyBase // XREADGROUP with single stream. eg XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > { - // Example: > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > - if (count.HasValue && count <= 0) + private readonly RedisValue groupName; + private readonly RedisValue consumerName; + private readonly RedisValue afterId; + private readonly int? count; + private readonly bool noAck; + private readonly int argCount; + + public SingleStreamReadGroupCommandMessage(int db, CommandFlags flags, RedisKey key, RedisValue groupName, RedisValue consumerName, RedisValue afterId, int? count, bool noAck) + : base(db, flags, RedisCommand.XREADGROUP, key) { - throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); - } + if (count.HasValue && count <= 0) + { + throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); + } - var totalValueCount = 6 + (count.HasValue ? 2 : 0) + (noAck ? 1 : 0); - var values = new RedisValue[totalValueCount]; + groupName.AssertNotNull(); + consumerName.AssertNotNull(); + afterId.AssertNotNull(); - var offset = 0; + this.groupName = groupName; + this.consumerName = consumerName; + this.afterId = afterId; + this.count = count; + this.noAck = noAck; + argCount = 6 + (count.HasValue ? 2 : 0) + (noAck ? 1 : 0); + } - values[offset++] = StreamConstants.Group; - values[offset++] = groupName; - values[offset++] = consumerName; + protected override void WriteImpl(PhysicalConnection physical) { + physical.WriteHeader(Command, argCount); + physical.WriteBulkString(StreamConstants.Group); + physical.WriteBulkString(groupName); + physical.WriteBulkString(consumerName); - if (count.HasValue) - { - values[offset++] = StreamConstants.Count; - values[offset++] = count.Value; - } + if (count.HasValue) + { + physical.WriteBulkString(StreamConstants.Count); + physical.WriteBulkString(count.Value); + } - if (noAck) - { - values[offset++] = StreamConstants.NoAck; - } + if (noAck) + { + physical.WriteBulkString(StreamConstants.NoAck); + } - values[offset++] = StreamConstants.Streams; - values[offset++] = key.AsRedisValue(); - values[offset] = afterId; + physical.WriteBulkString(StreamConstants.Streams); + physical.Write(Key); + physical.WriteBulkString(afterId); + } - return Message.Create(Database, - flags, - RedisCommand.XREADGROUP, - values); + public override int ArgCount => argCount; } - private Message GetSingleStreamReadMessage(RedisKey key, RedisValue afterId, int? count, CommandFlags flags) + private Message GetSingleStreamReadMessage(RedisKey key, RedisValue afterId, int? count, CommandFlags flags) => + new SingleStreamReadCommandMessage(Database, flags, key, afterId, count); + + private sealed class SingleStreamReadCommandMessage : Message.CommandKeyBase // XREAD with a single stream. Example: XREAD COUNT 2 STREAMS mystream 0-0 { - if (count.HasValue && count <= 0) + private readonly RedisValue afterId; + private readonly int? count; + private readonly int argCount; + + public SingleStreamReadCommandMessage(int db, CommandFlags flags, RedisKey key, RedisValue afterId, int? count) + : base(db, flags, RedisCommand.XREAD, key) { - throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); - } + if (count.HasValue && count <= 0) + { + throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); + } - var values = new RedisValue[3 + (count.HasValue ? 2 : 0)]; - var offset = 0; + afterId.AssertNotNull(); - if (count.HasValue) - { - values[offset++] = StreamConstants.Count; - values[offset++] = count.Value; + this.afterId = afterId; + this.count = count; + argCount = count.HasValue ? 5 : 3; } - values[offset++] = StreamConstants.Streams; - values[offset++] = key.AsRedisValue(); - values[offset] = afterId; + protected override void WriteImpl(PhysicalConnection physical) + { + physical.WriteHeader(Command, argCount); - // Example: > XREAD COUNT 2 STREAMS writers 1526999352406-0 - return Message.Create(Database, - flags, - RedisCommand.XREAD, - values); + if (count.HasValue) + { + physical.WriteBulkString(StreamConstants.Count); + physical.WriteBulkString(count.Value); + } + + physical.WriteBulkString(StreamConstants.Streams); + physical.Write(Key); + physical.WriteBulkString(afterId); + } + + public override int ArgCount => argCount; } + private Message GetStreamTrimMessage(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags) { if (maxLength < 0)