Skip to content

Commit

Permalink
etcd peer discovery fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Jun 12, 2024
1 parent e6f1893 commit b660ffd
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 24 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_peer_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ sync_desired_cluster(Backend, RetriesLeft, RetryDelay) ->
can_use_discovered_nodes(DiscoveredNodes, NodesAndProps)),
case CanUse of
true ->
?LOG_DEBUG("Selecting from nodes and props ~p",
?LOG_DEBUG("Peer discovery: Selecting from nodes and props ~p",
[NodesAndProps]),
case select_node_to_join(NodesAndProps) of
SelectedNode when SelectedNode =/= false ->
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/test/unit_quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-module(unit_quorum_queue_SUITE).

-compile(nowarn_export_all).
-compile(export_all).

all() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ list_nodes() ->
end,
Fun2 = fun(_Proplist) ->
%% error logging will be done by the client
Nodes = rabbitmq_peer_discovery_etcd_v3_client:list_nodes(),
{ok, {Nodes, disc}}
[{_, Node} | _] = rabbitmq_peer_discovery_etcd_v3_client:list_nodes(),
{ok, {Node, disc}}
end,
rabbit_peer_discovery_util:maybe_backend_configured(?BACKEND_CONFIG_KEY, Fun0, Fun1, Fun2).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,13 @@ connected({call, From}, list_keys, Data = #statem_data{connection_name = Conn})
rabbit_log:debug("etcd peer discovery: will use prefix ~ts to query for node keys", [Prefix]),
{ok, #{kvs := Result}} = eetcd_kv:get(C2),
rabbit_log:debug("etcd peer discovery returned keys: ~tp", [Result]),
Values = [maps:get(value, M) || M <- Result],
rabbit_log:debug("etcd peer discovery: listing node keys returned ~b results", [length(Values)]),
ParsedNodes = lists:map(fun extract_node/1, Values),
{Successes, Failures} = lists:partition(fun filter_node/1, ParsedNodes),
JoinedString = lists:join(",", [rabbit_data_coercion:to_list(Node) || Node <- lists:usort(Successes)]),
rabbit_log:error("etcd peer discovery: successfully extracted nodes: ~ts", [JoinedString]),
lists:foreach(fun(Val) ->
rabbit_log:error("etcd peer discovery: failed to extract node name from etcd value ~tp", [Val])
end, Failures),
gen_statem:reply(From, lists:usort(Successes)),
Values = [{maps:get(create_revision, M), maps:get(value, M)} || M <- Result],
rabbit_log:debug("etcd peer discovery: listing node keys returned ~b results",
[length(Values)]),
ParsedNodes = lists:filtermap(fun extract_node/1, Values),
rabbit_log:info("etcd peer discovery: successfully extracted nodes: ~0tp",
[ParsedNodes]),
gen_statem:reply(From, lists:usort(ParsedNodes)),
keep_state_and_data.


Expand Down Expand Up @@ -298,15 +295,18 @@ registration_value(#statem_data{node_key_lease_id = LeaseID, node_key_ttl_in_sec
<<"ttl">> => TTL
})).

-spec extract_node(binary()) -> atom() | {error, any()}.

extract_node(Payload) ->
extract_node({CreatedRev, Payload}) ->
case rabbit_json:try_decode(Payload) of
{error, Error} -> {error, Error};
{error, _Error} ->
rabbit_log:error("etcd peer discovery: failed to extract node name from etcd value ~tp",
[Payload]),
false;
{ok, Map} ->
case maps:get(<<"node">>, Map, undefined) of
undefined -> undefined;
Node -> rabbit_data_coercion:to_atom(Node)
undefined ->
false;
Node ->
{true, {CreatedRev, rabbit_data_coercion:to_atom(Node)}}
end
end.

Expand Down
7 changes: 6 additions & 1 deletion deps/rabbitmq_peer_discovery_etcd/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,12 @@ registration_with_locking_test(Config) ->
?assertEqual(ok, rabbitmq_peer_discovery_etcd_v3_client:unlock(Pid, LockOwnerKey)),

Condition2 = fun() ->
[node()] =:= rabbitmq_peer_discovery_etcd_v3_client:list_nodes(Pid)
case rabbitmq_peer_discovery_etcd_v3_client:list_nodes(Pid) of
[{_, N}] when N =:= node() ->
true;
_ ->
false
end
end,
try
rabbit_ct_helpers:await_condition(Condition2, 45000)
Expand Down
12 changes: 8 additions & 4 deletions deps/rabbitmq_peer_discovery_etcd/test/unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ registration_value_test(_Config) ->
extract_nodes_case1_test(_Config) ->
Input = registration_value_of(8488283859587364900, 61),
Expected = node(),

?assertEqual(Expected, rabbitmq_peer_discovery_etcd_v3_client:extract_node(Input)),

?assertEqual(undefined, rabbitmq_peer_discovery_etcd_v3_client:extract_node(<<"{}">>)).
CreatedRev = ?LINE,
?assertEqual({true, {CreatedRev, Expected}},
rabbitmq_peer_discovery_etcd_v3_client:extract_node(
{CreatedRev, Input})),

?assertEqual(false,
rabbitmq_peer_discovery_etcd_v3_client:extract_node(
{CreatedRev, <<"{}">>})).

filter_nodes_test(_Config) ->
Input = [node(), undefined, undefined, {error, reason1}, {error, {another, reason}}],
Expand Down

0 comments on commit b660ffd

Please sign in to comment.