diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index a6020b0e02b5..eb92b3670e9a 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -180,7 +180,7 @@ is_compatible(_, _, _) -> init(Q) when ?is_amqqueue(Q) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), {Name, _} = MaybeLeader = amqqueue:get_pid(Q), - Leader = case ra_leaderboard:lookup_leader(Name) of + Leader = case find_leader(Q) of undefined -> %% leader from queue record will have to suffice MaybeLeader; @@ -1349,6 +1349,23 @@ shrink_all(Node) -> case delete_member(Q, Node) of ok -> {QName, {ok, Size-1}}; + {error, cluster_change_not_permitted} -> + %% this could be timing related and due to a new leader just being + %% elected but it's noop command not been committed yet. + %% lets sleep and retry once + rabbit_log:info("~ts: failed to remove member (replica) on node ~w " + "as cluster change is not permitted. " + "retrying once in 500ms", + [rabbit_misc:rs(QName), Node]), + timer:sleep(500), + case delete_member(Q, Node) of + ok -> + {QName, {ok, Size-1}}; + {error, Err} -> + rabbit_log:warning("~ts: failed to remove member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end; {error, Err} -> rabbit_log:warning("~ts: failed to remove member (replica) on node ~w, error: ~w", [rabbit_misc:rs(QName), Node, Err]), @@ -1663,10 +1680,16 @@ open_files(Name) -> end. leader(Q) when ?is_amqqueue(Q) -> - {Name, Leader} = amqqueue:get_pid(Q), - case is_process_alive(Name, Leader) of - true -> Leader; - false -> '' + case find_leader(Q) of + undefined -> + ''; + {Name, LeaderNode} -> + case is_process_alive(Name, LeaderNode) of + true -> + LeaderNode; + false -> + '' + end end. peek(Vhost, Queue, Pos) -> @@ -1742,12 +1765,6 @@ format(Q, Ctx) when ?is_amqqueue(Q) -> {leader, LeaderNode}, {online, Online}]. -is_process_alive(Name, Node) -> - %% don't attempt rpc if node is not already connected - %% as this function is used for metrics and stats and the additional - %% latency isn't warranted - erlang:is_pid(erpc_call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)). - -spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer(). quorum_messages(QName) -> @@ -1930,3 +1947,30 @@ wait_for_projections(Node, QName, N) -> timer:sleep(100), wait_for_projections(Node, QName, N - 1) end. + +find_leader(Q) when ?is_amqqueue(Q) -> + %% the get_pid field in the queue record is updated async after a leader + %% change, so is likely to be the more stale than the leaderboard + {Name, _Node} = MaybeLeader = amqqueue:get_pid(Q), + Leaders = case ra_leaderboard:lookup_leader(Name) of + undefined -> + %% leader from queue record will have to suffice + [MaybeLeader]; + LikelyLeader -> + [LikelyLeader, MaybeLeader] + end, + Nodes = [node() | nodes()], + case lists:search(fun ({_Nm, Nd}) -> + lists:member(Nd, Nodes) + end, Leaders) of + {value, Leader} -> + Leader; + false -> + undefined + end. + +is_process_alive(Name, Node) -> + %% don't attempt rpc if node is not already connected + %% as this function is used for metrics and stats and the additional + %% latency isn't warranted + erlang:is_pid(erpc_call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)). diff --git a/deps/rabbit/test/cli_forget_cluster_node_SUITE.erl b/deps/rabbit/test/cli_forget_cluster_node_SUITE.erl index 6a8293c66409..b088cf68daff 100644 --- a/deps/rabbit/test/cli_forget_cluster_node_SUITE.erl +++ b/deps/rabbit/test/cli_forget_cluster_node_SUITE.erl @@ -12,6 +12,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). +-compile(nowarn_export_all). -compile(export_all). -import(clustering_utils, [