Skip to content

Commit

Permalink
Merge pull request #11934 from rabbitmq/mergify/bp/v4.0.x/pr-11933
Browse files Browse the repository at this point in the history
QQ: refactor and improve leader detection code. (backport #11933)
  • Loading branch information
kjnilsson authored Aug 7, 2024
2 parents 08f2994 + 7d8b870 commit 7ae3207
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 11 deletions.
66 changes: 55 additions & 11 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ is_compatible(_, _, _) ->
init(Q) when ?is_amqqueue(Q) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
{Name, _} = MaybeLeader = amqqueue:get_pid(Q),
Leader = case ra_leaderboard:lookup_leader(Name) of
Leader = case find_leader(Q) of
undefined ->
%% leader from queue record will have to suffice
MaybeLeader;
Expand Down Expand Up @@ -1349,6 +1349,23 @@ shrink_all(Node) ->
case delete_member(Q, Node) of
ok ->
{QName, {ok, Size-1}};
{error, cluster_change_not_permitted} ->
%% this could be timing related and due to a new leader just being
%% elected but it's noop command not been committed yet.
%% lets sleep and retry once
rabbit_log:info("~ts: failed to remove member (replica) on node ~w "
"as cluster change is not permitted. "
"retrying once in 500ms",
[rabbit_misc:rs(QName), Node]),
timer:sleep(500),
case delete_member(Q, Node) of
ok ->
{QName, {ok, Size-1}};
{error, Err} ->
rabbit_log:warning("~ts: failed to remove member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end;
{error, Err} ->
rabbit_log:warning("~ts: failed to remove member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
Expand Down Expand Up @@ -1663,10 +1680,16 @@ open_files(Name) ->
end.

leader(Q) when ?is_amqqueue(Q) ->
{Name, Leader} = amqqueue:get_pid(Q),
case is_process_alive(Name, Leader) of
true -> Leader;
false -> ''
case find_leader(Q) of
undefined ->
'';
{Name, LeaderNode} ->
case is_process_alive(Name, LeaderNode) of
true ->
LeaderNode;
false ->
''
end
end.

peek(Vhost, Queue, Pos) ->
Expand Down Expand Up @@ -1742,12 +1765,6 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
{leader, LeaderNode},
{online, Online}].

is_process_alive(Name, Node) ->
%% don't attempt rpc if node is not already connected
%% as this function is used for metrics and stats and the additional
%% latency isn't warranted
erlang:is_pid(erpc_call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).

-spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer().

quorum_messages(QName) ->
Expand Down Expand Up @@ -1930,3 +1947,30 @@ wait_for_projections(Node, QName, N) ->
timer:sleep(100),
wait_for_projections(Node, QName, N - 1)
end.

find_leader(Q) when ?is_amqqueue(Q) ->
%% the get_pid field in the queue record is updated async after a leader
%% change, so is likely to be the more stale than the leaderboard
{Name, _Node} = MaybeLeader = amqqueue:get_pid(Q),
Leaders = case ra_leaderboard:lookup_leader(Name) of
undefined ->
%% leader from queue record will have to suffice
[MaybeLeader];
LikelyLeader ->
[LikelyLeader, MaybeLeader]
end,
Nodes = [node() | nodes()],
case lists:search(fun ({_Nm, Nd}) ->
lists:member(Nd, Nodes)
end, Leaders) of
{value, Leader} ->
Leader;
false ->
undefined
end.

is_process_alive(Name, Node) ->
%% don't attempt rpc if node is not already connected
%% as this function is used for metrics and stats and the additional
%% latency isn't warranted
erlang:is_pid(erpc_call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
1 change: 1 addition & 0 deletions deps/rabbit/test/cli_forget_cluster_node_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile(nowarn_export_all).
-compile(export_all).

-import(clustering_utils, [
Expand Down

0 comments on commit 7ae3207

Please sign in to comment.