Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription unsubscribes unexpectedly #1586

Closed
winseros opened this issue Oct 15, 2020 · 2 comments · Fixed by #1947
Closed

Subscription unsubscribes unexpectedly #1586

winseros opened this issue Oct 15, 2020 · 2 comments · Fixed by #1947

Comments

@winseros
Copy link

Greetings.

We face a situation when sometimes subscriptions get unsubscribed on their own will.

The application code looks simple:

_redis = ConnectionMultiplexer.Connect(_redisConnectionString);
_subscriber = _redis.GetSubscriber();
_subscriber.Subscribe("inbox-channel").OnMessage(OnMessage);

The ConnectionMultiplexer log has nothing suspicious:

Connecting 172.22.0.2:6379/Interactive...
BeginConnect: 172.22.0.2:6379
Connected Interactive/172.22.0.2:6379
Server handshake
Authenticating (user/password)
Setting client name: SvcInboxApp
Auto-configure...
Sending critical tracer: Interactive/172.22.0.2:6379
Writing to Interactive/172.22.0.2:6379: ECHO
Flushing outbound buffer
Starting read
Response from Interactive/172.22.0.2:6379 / ECHO: BulkString: 16 bytes
1 unique nodes specified
Writing to Interactive/172.22.0.2:6379: PING
Requesting tie-break from 172.22.0.2:6379 > __Booksleeve_TieBreak...
Writing to Interactive/172.22.0.2:6379: GET __Booksleeve_TieBreak
Allowing endpoints 00:00:40 to respond...
Awaiting task completion, IOCP: (Busy=0,Free=1000,Min=2,Max=1000), WORKER: (Busy=3,Free=32764,Min=2,Max=32767)
Response from Interactive/172.22.0.2:6379 / PING: SimpleString: PONG
Response from Interactive/172.22.0.2:6379 / GET __Booksleeve_TieBreak: (null)
All tasks completed cleanly, IOCP: (Busy=0,Free=1000,Min=2,Max=1000), WORKER: (Busy=3,Free=32764,Min=2,Max=32767)
172.22.0.2:6379 returned with success
Waiting for tiebreakers...
All tasks are already complete
172.22.0.2:6379 had no tiebreaker set
Single master detected: 172.22.0.2:6379
172.22.0.2:6379: Standalone v2.0.0, master; keep-alive: 00:01:00; int: ConnectedEstablished; sub: ConnectedEstablishing
172.22.0.2:6379: int ops=11, qu=0, qs=0, qc=0, wr=0, socks=1; sub ops=4, qu=0, qs=4, qc=0, wr=0, socks=1
Circular op-count snapshot; int: 0+11=11 (1.10 ops/s; spans 10s); sub: 0+4=4 (0.40 ops/s; spans 10s)
Sync timeouts: 0; async timeouts: 0; fire and forget: 0; last heartbeat: -1s ago
Starting heartbeat...

But when we recompiled the library with VERBOSE flag and captured the traces, we got:

08:17:28.621 +00:00|OnHeartbeat: ConnectedEstablished
08:17:28.621 +00:00|Subscription/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:29.607 +00:00|ReconfigureAsync|172.22.0.2:6379: WaitingForActivation
08:17:29.608 +00:00|172.22.0.2:6379|Now unusable: DidNotRespond
08:17:29.608 +00:00|ReconfigureAsync|Exiting reconfiguration...
08:17:29.609 +00:00|Subscription/172.22.0.2:6379|Writing: [-1]:UNSUBSCRIBE inbox-channel (TrackSubscriptionsProcessor)
08:17:29.610 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|MultiBulk: 3 items
08:17:29.610 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|Matching result...
08:17:29.610 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|Response to: [-1]:UNSUBSCRIBE inbox-channel (TrackSubscriptionsProcessor)
08:17:29.611 +00:00|StackExchange.Redis.ResultProcessor+TrackSubscriptionsProcessor|Completed with success: MultiBulk: 3 items (TrackSubscriptionsProcessor)
08:17:29.611 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|Processed 1 messages
08:17:29.621 +00:00|OnHeartbeat|heartbeat
08:17:29.621 +00:00|Interactive/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:29.621 +00:00|172.22.0.2:6379|Now usable
08:17:29.621 +00:00|Subscription/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:30.621 +00:00|OnHeartbeat|heartbeat
08:17:30.622 +00:00|Interactive/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:30.622 +00:00|Subscription/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished

Please pay attention to the lines:

08:17:29.607 +00:00|ReconfigureAsync|172.22.0.2:6379: WaitingForActivation
08:17:29.608 +00:00|172.22.0.2:6379|Now unusable: DidNotRespond
08:17:29.609 +00:00|Subscription/172.22.0.2:6379|Writing: [-1]:UNSUBSCRIBE inbox-channel (TrackSubscriptionsProcessor)
08:17:29.621 +00:00|172.22.0.2:6379|Now usable

They mean that, for some reason, the ConnectionMultiplexer decided to unsubscribe and did not restore the subscription later.

Looking at the source code here gives some glues.

Trace(Format.ToString(endpoints[i]) + ": " + task.Status);
if (task.IsFaulted)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
var aex = task.Exception;
foreach (var ex in aex.InnerExceptions)
{
log?.WriteLine($"{Format.ToString(endpoints[i])} faulted: {ex.Message}");
failureMessage = ex.Message;
}
}
else if (task.IsCanceled)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(endpoints[i])} was canceled");
}
else if (task.IsCompleted)
{
var server = servers[i];
if (task.Result)
{
servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(endpoints[i])} returned with success");
// count the server types
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Standalone:
standaloneCount++;
break;
case ServerType.Sentinel:
sentinelCount++;
break;
case ServerType.Cluster:
clusterCount++;
break;
}
if (clusterCount > 0 && !encounteredConnectedClusterServer)
{
// we have encountered a connected server with clustertype for the first time.
// so we will get list of other nodes from this server using "CLUSTER NODES" command
// and try to connect to these other nodes in the next iteration
encounteredConnectedClusterServer = true;
updatedClusterEndpointCollection = await GetEndpointsFromClusterNodes(server, log).ForAwait();
}
// set the server UnselectableFlags and update masters list
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Sentinel:
case ServerType.Standalone:
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
if (server.IsReplica)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
default:
servers[i].SetUnselectable(UnselectableFlags.ServerType);
break;
}
}
else
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(endpoints[i])} returned, but incorrectly");
}
}
else
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(endpoints[i])} did not respond");
}
}

The multiplexer sends ping/echo messages to the server and expects the server to respond in a timeout. No/wrong/failed response - the server is considered unreachable, and subscriptions are dropped.

But in our case, the ping task is in WaitingForActivation state. I guess (due to high application load?), it has not even started to execute.

The behavior seems like a bug for me. And we'd appreciate if it could be addressed.

@NickCraver
Copy link
Collaborator

Tagging for #1912

NickCraver added a commit that referenced this issue Feb 4, 2022
We're working on pub/sub - breaking it out explicitly from #1912. This relates to several issues and in general handling resubscriptions on reconnect.

Issues: #1110, #1586, #1830 #1835

There are a few things in play we're investigating:
- [x] Subscription heartbeat not going over the subscription connection (due to `PING` and `GetBridge`)
- [x] Subscriptions not reconnecting at all (or potentially doing to and unsubscribing according to some issues)
- [x] Subscriptions always going to a single cluster node (due to `default(RedisKey)`)

Overall this set of changes:
- Completely restructures how RedisSubscriber works
  - No more `PendingSubscriptionState` (`Subscription` has the needed bits to reconnect)
  - Cleaner method topology (in `RedisSubscriber`, rather than `Subscriber`, `RedisSubscriber`, and `ConnectionMultiplexer`)
    - By placing these on `RedisSubscriber`, we can cleanly use `ExecuteSync/Async` bits, get proper profiling, etc.
  - Proper sync/async split (rather than `Wait()` in sync paths)
- Changes how subscriptions work
  - The `Subscription` object is added to the `ConnectionMultiplexer` tracking immediately, but the command itself actually goes to the server and back (unless FireAndForget) before returning for proper ordering like other commands.
  - No more `Task.Run()` loop - we now ensure reconnects as part of the handshake
  - Subscriptions are marked as not having a server the moment a disconnect is fired
    - Question: Should we have a throttle around this for massive numbers of connections, or async it?
- Changes how connecting works
  - The connection completion handler will now fire when the _second_ bridge/connection completes, this means we won't have `interactive` connected but `subscription` in an unknown state - both are connected before we fire the handler meaning the moment we come back from connect, subscriptions are in business.
- Moves to a `ConcurrentDictionary` since we only need limited locking around this and we only have it once per multiplexer.
  - TODO: This needs eyes, we could shift it - implementation changed along the way where this isn't a critical detail
- Fixes the `TrackSubscriptionsProcessor` - this was never setting the result but didn't notice in 8 years because downstream code never cared.
  - Note: each `Subscription` has a processor instance (with minimal state) because when the subscription command comes back _then_ we need to decide if it successfully registered (if it didn't, we need to maintain it has no successful server)
- `ConnectionMultiplexer` grew a `DefaultSubscriber` for running some commands without lots of method duplication, e.g. ensuring servers are connected.
- Overrides `GetHashSlot` on `CommandChannelBase` with the new `RedisChannel`-based methods so that operates correctly

Not directly related changes which helped here:
- Better profiler helpers for tests and profiler logging in them
- Re-enables a few `PubSub` tests that were unreliable before...but correctly so.

TODO: I'd like to add a few more test scenarios here:
- [x] Simple Subscribe/Publish/await Until/check pattern to ensure back-to-back subscribe/publish works well
- [x] Cluster connection failure and subscriptions moving to another node

To consider:
- [x] Subscription await loop from EnsureSubscriptionsAsync and connection impact on large reconnect situations
   - In a reconnect case, this is background and only the nodes affected have any latency...but still.
- [ ] TODOs in code around variadic commands, e.g. re-subscribing with far fewer commands by using `SUBSCRIBE <key1> <key2>...`
   - In cluster, we'd have to batch per slot...or just go for the first available node
   - ...but if we go for the first available node, the semantics of `IsConnected` are slightly off in the not connected (`CurrentServer is null`) case, because we'd say we're connected to where it _would_ go even though that'd be non-deterministic without hashslot batching. I think this is really minor and shouldn't affect our decision.
- [x] `ConcurrentDictionary` vs. returning to locks around a `Dictionary`
   - ...but if we have to lock on firing consumption of handlers anyway, concurrency overhead is probably a wash.
@NickCraver
Copy link
Collaborator

This is drastically improved in #1947 and will be in the 2.5 release - thank you for the detail!

@NickCraver NickCraver linked a pull request Feb 6, 2022 that will close this issue
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants