Skip to content

Commit

Permalink
Merge pull request #10091 from rabbitmq/rabbit-local-random-exchange
Browse files Browse the repository at this point in the history
Add a new local random exchange type.
  • Loading branch information
kjnilsson authored Jun 12, 2024
2 parents 3d10f25 + cca64e8 commit 5f79dec
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 3 deletions.
8 changes: 8 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,14 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "rabbit_local_random_exchange_SUITE",
size = "small",
additional_beam = [
":test_queue_utils_beam",
],
)

rabbitmq_integration_suite(
name = "rabbit_direct_reply_to_prop_SUITE",
size = "medium",
Expand Down
13 changes: 12 additions & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_exchange_type_fanout.erl",
"src/rabbit_exchange_type_headers.erl",
"src/rabbit_exchange_type_invalid.erl",
"src/rabbit_exchange_type_local_random.erl",
"src/rabbit_exchange_type_topic.erl",
"src/rabbit_feature_flags.erl",
"src/rabbit_ff_controller.erl",
Expand Down Expand Up @@ -391,6 +392,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_exchange_type_fanout.erl",
"src/rabbit_exchange_type_headers.erl",
"src/rabbit_exchange_type_invalid.erl",
"src/rabbit_exchange_type_local_random.erl",
"src/rabbit_exchange_type_topic.erl",
"src/rabbit_feature_flags.erl",
"src/rabbit_ff_controller.erl",
Expand Down Expand Up @@ -670,6 +672,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_exchange_type_fanout.erl",
"src/rabbit_exchange_type_headers.erl",
"src/rabbit_exchange_type_invalid.erl",
"src/rabbit_exchange_type_local_random.erl",
"src/rabbit_exchange_type_topic.erl",
"src/rabbit_feature_flags.erl",
"src/rabbit_ff_controller.erl",
Expand Down Expand Up @@ -2048,7 +2051,6 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)

erlang_bytecode(
name = "test_event_recorder_beam",
testonly = True,
Expand Down Expand Up @@ -2129,3 +2131,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "rabbit_local_random_exchange_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_local_random_exchange_SUITE.erl"],
outs = ["test/rabbit_local_random_exchange_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
116 changes: 116 additions & 0 deletions deps/rabbit/src/rabbit_exchange_type_local_random.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_exchange_type_local_random).
-behaviour(rabbit_exchange_type).
-include_lib("rabbit_common/include/rabbit.hrl").

-rabbit_feature_flag({?MODULE,
#{desc => "Local random exchange",
stability => stable
}}).

-rabbit_boot_step({?MODULE,
[{description, "exchange type local random"},
{mfa, {rabbit_registry, register,
[exchange, <<"x-local-random">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, kernel_ready}
]}).

-export([add_binding/3,
assert_args_equivalence/2,
create/2,
delete/2,
policy_changed/2,
description/0,
recover/2,
remove_bindings/3,
validate_binding/2,
route/3,
serialise_events/0,
validate/1,
info/1,
info/2
]).

description() ->
[{name, <<"x-local-random">>},
{description, <<"Picks one random local binding (queue) to route via (to).">>}].

route(#exchange{name = Name}, _Msg, _Opts) ->
Matches = rabbit_router:match_routing_key(Name, [<<>>]),
case lists:filter(fun filter_local_queue/1, Matches) of
[] ->
[];
[_] = One ->
One;
LocalMatches ->
Rand = rand:uniform(length(LocalMatches)),
[lists:nth(Rand, LocalMatches)]
end.

info(_X) -> [].
info(_X, _) -> [].
serialise_events() -> false.
validate(_X) ->
case rabbit_feature_flags:is_enabled(?MODULE) of
true ->
ok;
false ->
rabbit_misc:amqp_error(
precondition_failed,
"x-local-random exchange feature not available", [],
'exchange.declare')
end.

create(_Serial, _X) -> ok.
recover(_X, _Bs) -> ok.
delete(_Serial, _X) -> ok.
policy_changed(_X1, _X2) -> ok.
add_binding(_Serial, _X, _B) -> ok.
remove_bindings(_Serial, _X, _Bs) -> ok.

validate_binding(_X, #binding{destination = Dest,
key = <<>>}) ->
case rabbit_amqqueue:lookup(Dest) of
{ok, Q} ->
case amqqueue:get_type(Q) of
rabbit_classic_queue ->
ok;
Type ->
{error, {binding_invalid,
"Queue type ~ts not valid for this exchange type",
[Type]}}
end;
_ ->
{error, {binding_invalid,
"Destination not found",
[]}}
end;
validate_binding(_X, #binding{key = BKey}) ->
{error, {binding_invalid,
"Non empty binding '~s' key not permitted",
[BKey]}}.

assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).

filter_local_queue(QName) ->
%% TODO: introduce lookup function that _only_ gets the pid
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
case amqqueue:get_pid(Q) of
Pid when is_pid(Pid) andalso
node(Pid) =:= node() ->
is_process_alive(Pid);
_ ->
false
end;
_ ->
false
end.
5 changes: 3 additions & 2 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ init_per_group(Group, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_nodes_count, ClusterSize},
{rmq_nodename_suffix, Group},
{tcp_ports_base}]),
{tcp_ports_base, {skip_n_nodes, ClusterSize}}
]),
Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
Ret = rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
Expand Down Expand Up @@ -263,7 +264,7 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ
Config2 = rabbit_ct_helpers:set_config(Config1,
[{rmq_nodes_count, 3},
{rmq_nodename_suffix, Testcase},
{tcp_ports_base},
{tcp_ports_base, {skip_n_nodes, 3}},
{queue_name, Q},
{alt_queue_name, <<Q/binary, "_alt">>}
]),
Expand Down
Loading

0 comments on commit 5f79dec

Please sign in to comment.