Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moving pre 2.6.45 write commands that were not previously considered primary-only out of primary-only #2183

Merged
merged 3 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release Notes

## Unreleased

- Fix [#2182](https://github.com/StackExchange/StackExchange.Redis/issues/2182): Be more flexible in which commands are "primary only" in order to support users with replicas that are explicitly configured to allow writes ([#2183 by slorello89](https://github.com/StackExchange/StackExchange.Redis/pull/2183))

## 2.6.48

- URGENT Fix: [#2167](https://github.com/StackExchange/StackExchange.Redis/issues/2167), [#2176](https://github.com/StackExchange/StackExchange.Redis/issues/2176): fix error in batch/transaction handling that can result in out-of-order instructions ([#2177 by MarcGravell](https://github.com/StackExchange/StackExchange.Redis/pull/2177))
Expand Down
22 changes: 11 additions & 11 deletions src/StackExchange.Redis/Enums/RedisCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.BLPOP:
case RedisCommand.BRPOP:
case RedisCommand.BRPOPLPUSH:
case RedisCommand.COPY:
case RedisCommand.DECR:
case RedisCommand.DECRBY:
case RedisCommand.DEL:
Expand All @@ -273,7 +272,6 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.EXPIRETIME:
case RedisCommand.FLUSHALL:
case RedisCommand.FLUSHDB:
case RedisCommand.GEOADD:
case RedisCommand.GEOSEARCHSTORE:
case RedisCommand.GETDEL:
case RedisCommand.GETEX:
Expand Down Expand Up @@ -323,21 +321,13 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.SETRANGE:
case RedisCommand.SINTERSTORE:
case RedisCommand.SMOVE:
case RedisCommand.SORT:
case RedisCommand.SPOP:
case RedisCommand.SREM:
case RedisCommand.SUNIONSTORE:
case RedisCommand.SWAPDB:
case RedisCommand.TOUCH:
case RedisCommand.UNLINK:
case RedisCommand.XACK:
case RedisCommand.XADD:
case RedisCommand.XAUTOCLAIM:
case RedisCommand.XCLAIM:
case RedisCommand.XDEL:
case RedisCommand.XGROUP:
case RedisCommand.XREADGROUP:
case RedisCommand.XTRIM:
case RedisCommand.ZADD:
case RedisCommand.ZDIFFSTORE:
case RedisCommand.ZINTERSTORE:
Expand Down Expand Up @@ -447,7 +437,6 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.UNSUBSCRIBE:
case RedisCommand.UNWATCH:
case RedisCommand.WATCH:
// Stream commands verified working on replicas
case RedisCommand.XINFO:
case RedisCommand.XLEN:
case RedisCommand.XPENDING:
Expand All @@ -474,6 +463,17 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.ZSCORE:
case RedisCommand.ZUNION:
case RedisCommand.UNKNOWN:
// Writable commands, but allowed for the writable-replicas scenario
case RedisCommand.COPY:
case RedisCommand.GEOADD:
case RedisCommand.SORT:
case RedisCommand.XACK:
case RedisCommand.XADD:
case RedisCommand.XCLAIM:
case RedisCommand.XDEL:
case RedisCommand.XGROUP:
case RedisCommand.XREADGROUP:
case RedisCommand.XTRIM:
return false;
default:
throw new ArgumentOutOfRangeException(nameof(command), $"Every RedisCommand must be defined in Message.IsPrimaryOnly, unknown command '{command}' encountered.");
Expand Down
88 changes: 0 additions & 88 deletions tests/StackExchange.Redis.Tests/Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,94 +26,6 @@ public void IsStreamType()
Assert.Equal(RedisType.Stream, keyType);
}

[Fact]
public void StreamOpsFailOnReplica()
{
using var conn = Create(configuration: TestConfig.Current.PrimaryServerAndPort, require: RedisFeatures.v5_0_0);
using var replicaConn = Create(configuration: TestConfig.Current.ReplicaServerAndPort, require: RedisFeatures.v5_0_0);

var db = conn.GetDatabase();
var replicaDb = replicaConn.GetDatabase();

// XADD: Works on primary, not secondary
db.StreamAdd(GetUniqueKey("auto_id"), "field1", "value1");
var ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamAdd(GetUniqueKey("auto_id"), "field1", "value1"));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// Add stream content to primary
var key = GetUniqueKey("group_ack");
const string groupName1 = "test_group1",
groupName2 = "test_group2",
consumer1 = "test_consumer1",
consumer2 = "test_consumer2";

// Add for primary
var id1 = db.StreamAdd(key, "field1", "value1");
var id2 = db.StreamAdd(key, "field2", "value2");
var id3 = db.StreamAdd(key, "field3", "value3");
var id4 = db.StreamAdd(key, "field4", "value4");

// XGROUP: Works on primary, not replica
db.StreamCreateConsumerGroup(key, groupName1, StreamPosition.Beginning);
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamCreateConsumerGroup(key, groupName2, StreamPosition.Beginning));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// Create the second group on the primary, for the rest of the tests.
db.StreamCreateConsumerGroup(key, groupName2, StreamPosition.Beginning);

// XREADGROUP: Works on primary, not replica
// Read all 4 messages, they will be assigned to the consumer
var entries = db.StreamReadGroup(key, groupName1, consumer1, StreamPosition.NewMessages);
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamReadGroup(key, groupName2, consumer2, StreamPosition.NewMessages));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// XACK: Works on primary, not secondary
var oneAck = db.StreamAcknowledge(key, groupName1, id1);
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamAcknowledge(key, groupName2, id1));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// XPENDING: Works on primary and replica
// Get the pending messages for consumer2.
var pendingMessages = db.StreamPendingMessages(key, groupName1, 10, consumer1);
var pendingMessages2 = replicaDb.StreamPendingMessages(key, groupName2, 10, consumer2);

// XCLAIM: Works on primary, not replica
// Claim the messages for consumer1.
var messages = db.StreamClaim(key, groupName1, consumer1, 0, messageIds: pendingMessages.Select(pm => pm.MessageId).ToArray());
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamClaim(key, groupName2, consumer2, 0, messageIds: pendingMessages.Select(pm => pm.MessageId).ToArray()));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// XDEL: Works on primary, not replica
db.StreamDelete(key, new RedisValue[] { id4 });
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamDelete(key, new RedisValue[] { id3 }));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);

// XINFO: Works on primary and replica
db.StreamInfo(key);
replicaDb.StreamInfo(key);

// XLEN: Works on primary and replica
db.StreamLength(key);
replicaDb.StreamLength(key);

// XRANGE: Works on primary and replica
db.StreamRange(key);
replicaDb.StreamRange(key);

// XREVRANGE: Works on primary and replica
db.StreamRange(key, messageOrder: Order.Descending);
replicaDb.StreamRange(key, messageOrder: Order.Descending);

// XREAD: Works on primary and replica
db.StreamRead(key, "0-1");
replicaDb.StreamRead(key, "0-1");

// XTRIM: Works on primary, not replica
db.StreamTrim(key, 10);
ex = Assert.Throws<RedisConnectionException>(() => replicaDb.StreamTrim(key, 10));
Assert.StartsWith("No connection (requires writable - not eligible for replica) is active/available", ex.Message);
}

[Fact]
public void StreamAddSinglePairWithAutoId()
{
Expand Down