Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native AMQP follow ups #10662

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
correlation_id/1,
user_id/1,
message_id/1,
property/2,
timestamp/1,
priority/1,
set_ttl/2,
Expand Down Expand Up @@ -302,6 +303,14 @@ message_id(#?MODULE{protocol = Proto,
message_id(BasicMsg) ->
mc_compat:message_id(BasicMsg).

-spec property(atom(), state()) ->
{utf8, binary()} | undefined.
property(Property, #?MODULE{protocol = Proto,
data = Data}) ->
Proto:property(Property, Data);
property(_Property, _BasicMsg) ->
undefined.

-spec set_ttl(undefined | non_neg_integer(), state()) -> state().
set_ttl(Value, #?MODULE{annotations = Anns} = State) ->
State#?MODULE{annotations = maps:put(ttl, Value, Anns)};
Expand Down
19 changes: 4 additions & 15 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ property(message_id, #msg{properties = #'v1_0.properties'{message_id = MsgId}})
MsgId;
property(user_id, #msg{properties = #'v1_0.properties'{user_id = UserId}}) ->
UserId;
property(subject, #msg{properties = #'v1_0.properties'{subject = Subject}}) ->
Subject;
property(_Prop, #msg{}) ->
undefined.

Expand Down Expand Up @@ -178,13 +180,6 @@ get_property(priority, Msg) ->
_ ->
undefined
end
end;
get_property(subject, Msg) ->
case Msg of
#msg{properties = #'v1_0.properties'{subject = {utf8, Subject}}} ->
Subject;
_ ->
undefined
end.

convert_to(?MODULE, Msg, _Env) ->
Expand Down Expand Up @@ -430,10 +425,6 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
Priority = get_property(priority, Msg),
Timestamp = get_property(timestamp, Msg),
Ttl = get_property(ttl, Msg),
RoutingKeys = case get_property(subject, Msg) of
undefined -> undefined;
Subject -> [Subject]
end,

Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of
{list, DeathMaps} ->
Expand All @@ -458,10 +449,8 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
maps_put_truthy(
ttl, Ttl,
maps_put_truthy(
?ANN_ROUTING_KEYS, RoutingKeys,
maps_put_truthy(
deaths, Deaths,
#{})))))),
deaths, Deaths,
#{}))))),
case MA of
[] ->
Anns;
Expand Down
107 changes: 67 additions & 40 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
}).

-record(incoming_link, {
exchange :: rabbit_exchange:name(),
exchange :: rabbit_types:exchange() | rabbit_exchange:name(),
routing_key :: undefined | rabbit_types:routing_key(),
%% queue_name_bin is only set if the link target address refers to a queue.
queue_name_bin :: undefined | rabbit_misc:resource_name(),
Expand Down Expand Up @@ -713,9 +713,9 @@ handle_control(#'v1_0.attach'{role = ?SEND_ROLE,
user = User}}) ->
ok = validate_attach(Attach),
case ensure_target(Target, Vhost, User) of
{ok, XName, RoutingKey, QNameBin} ->
{ok, Exchange, RoutingKey, QNameBin} ->
IncomingLink = #incoming_link{
exchange = XName,
exchange = Exchange,
routing_key = RoutingKey,
queue_name_bin = QNameBin,
delivery_count = DeliveryCountInt,
Expand Down Expand Up @@ -945,17 +945,14 @@ handle_control(#'v1_0.flow'{handle = Handle} = Flow,
end
end;

handle_control(#'v1_0.detach'{handle = Handle = ?UINT(HandleInt),
closed = Closed},
handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
State0 = #state{queue_states = QStates0,
incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks0,
outgoing_unsettled_map = Unsettled0,
cfg = #cfg{
writer_pid = WriterPid,
vhost = Vhost,
user = #user{username = Username},
channel_num = Ch}}) ->
user = #user{username = Username}}}) ->
Ctag = handle_to_ctag(HandleInt),
%% TODO delete queue if closed flag is set to true? see 2.6.6
%% TODO keep the state around depending on the lifetime
Expand Down Expand Up @@ -1011,8 +1008,7 @@ handle_control(#'v1_0.detach'{handle = Handle = ?UINT(HandleInt),
incoming_links = maps:remove(HandleInt, IncomingLinks),
outgoing_links = OutgoingLinks,
outgoing_unsettled_map = Unsettled},
rabbit_amqp_writer:send_command(WriterPid, Ch, #'v1_0.detach'{handle = Handle,
closed = Closed}),
maybe_detach_reply(Detach, State, State0),
publisher_or_consumer_deleted(State, State0),
{noreply, State};

Expand Down Expand Up @@ -1533,7 +1529,7 @@ incoming_link_transfer(
rcv_settle_mode = RcvSettleMode,
handle = Handle = ?UINT(HandleInt)},
MsgPart,
#incoming_link{exchange = XName = #resource{name = XNameBin},
#incoming_link{exchange = Exchange,
routing_key = LinkRKey,
delivery_count = DeliveryCount0,
incoming_unconfirmed_map = U0,
Expand Down Expand Up @@ -1564,20 +1560,16 @@ incoming_link_transfer(
Sections = amqp10_framing:decode_bin(MsgBin),
?DEBUG("~s Inbound content:~n ~tp",
[?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]),
Anns0 = #{?ANN_EXCHANGE => XNameBin},
Anns = case LinkRKey of
undefined -> Anns0;
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
end,
Mc0 = mc:init(mc_amqp, Sections, Anns),
Mc1 = rabbit_message_interceptor:intercept(Mc0),
{Mc, RoutingKey} = ensure_routing_key(Mc1),
check_user_id(Mc, User),
messages_received(Settled),
case rabbit_exchange:lookup(XName) of
{ok, Exchange} ->
check_write_permitted_on_topic(Exchange, User, RoutingKey),
QNames = rabbit_exchange:route(Exchange, Mc, #{return_binding_keys => true}),
case rabbit_exchange_lookup(Exchange) of
{ok, X = #exchange{name = #resource{name = XNameBin}}} ->
Anns = #{?ANN_EXCHANGE => XNameBin},
Mc0 = mc:init(mc_amqp, Sections, Anns),
{RoutingKey, Mc1} = ensure_routing_key(LinkRKey, Mc0),
Mc = rabbit_message_interceptor:intercept(Mc1),
check_user_id(Mc, User),
messages_received(Settled),
check_write_permitted_on_topic(X, User, RoutingKey),
QNames = rabbit_exchange:route(X, Mc, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace),
case not Settled andalso
RcvSettleMode =:= ?V_1_0_RECEIVER_SETTLE_MODE_SECOND of
Expand Down Expand Up @@ -1619,19 +1611,29 @@ incoming_link_transfer(
{error, [Disposition, Detach]}
end.

ensure_routing_key(Mc) ->
case mc:routing_keys(Mc) of
[RoutingKey] ->
{Mc, RoutingKey};
[] ->
%% Set the default routing key of AMQP 0.9.1 'basic.publish'{}.
%% For example, when the client attached to target /exchange/amq.fanout and sends a
%% message without setting a 'subject' in the message properties, the routing key is
%% ignored during routing, but receiving code paths still expect some routing key to be set.
DefaultRoutingKey = <<"">>,
Mc1 = mc:set_annotation(?ANN_ROUTING_KEYS, [DefaultRoutingKey], Mc),
{Mc1, DefaultRoutingKey}
end.
rabbit_exchange_lookup(X = #exchange{}) ->
{ok, X};
rabbit_exchange_lookup(XName = #resource{}) ->
rabbit_exchange:lookup(XName).

ensure_routing_key(LinkRKey, Mc0) ->
RKey = case LinkRKey of
undefined ->
case mc:property(subject, Mc0) of
undefined ->
%% Set the default routing key of AMQP 0.9.1 'basic.publish'{}.
%% For example, when the client attached to target /exchange/amq.fanout and sends a
%% message without setting a 'subject' in the message properties, the routing key is
%% ignored during routing, but receiving code paths still expect some routing key to be set.
<<"">>;
{utf8, Subject} ->
Subject
end;
_ ->
LinkRKey
end,
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc0),
{RKey, Mc}.

process_routing_confirm([], _SenderSettles = true, _, U) ->
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
Expand Down Expand Up @@ -1692,16 +1694,25 @@ ensure_target(#'v1_0.target'{address = Address,
{ok, Dest} ->
QNameBin = ensure_terminus(target, Dest, Vhost, User, Durable),
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
XName = rabbit_misc:r(Vhost, exchange, list_to_binary(XNameList1)),
XNameBin = list_to_binary(XNameList1),
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
{ok, X} = rabbit_exchange:lookup(XName),
check_internal_exchange(X),
check_write_permitted(XName, User),
%% Pre-declared exchanges are protected against deletion and modification.
%% Let's cache the whole #exchange{} record to save a
%% rabbit_exchange:lookup(XName) call each time we receive a message.
Exchange = case XNameBin of
<<>> -> X;
<<"amq.", _/binary>> -> X;
_ -> XName
end,
RoutingKey = case RK of
undefined -> undefined;
[] -> undefined;
_ -> list_to_binary(RK)
end,
{ok, XName, RoutingKey, QNameBin};
{ok, Exchange, RoutingKey, QNameBin};
{error, _} = E ->
E
end;
Expand Down Expand Up @@ -2192,6 +2203,22 @@ publisher_or_consumer_deleted(
ok
end.

%% If we previously already sent a detach with an error condition, and the Detach we
%% receive here is therefore the client's reply, do not reply again with a 3rd detach.
maybe_detach_reply(Detach,
#state{incoming_links = NewIncomingLinks,
outgoing_links = NewOutgoingLinks,
cfg = #cfg{writer_pid = WriterPid,
channel_num = Ch}},
#state{incoming_links = OldIncomingLinks,
outgoing_links = OldOutgoingLinks})
when map_size(NewIncomingLinks) < map_size(OldIncomingLinks) orelse
map_size(NewOutgoingLinks) < map_size(OldOutgoingLinks) ->
Reply = Detach#'v1_0.detach'{error = undefined},
rabbit_amqp_writer:send_command(WriterPid, Ch, Reply);
maybe_detach_reply(_, _, _) ->
ok.

check_internal_exchange(#exchange{internal = true,
name = XName}) ->
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
Expand Down
24 changes: 17 additions & 7 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -925,35 +925,45 @@ server_closes_link(QType, Config) ->

server_closes_link_exchange(Config) ->
XName = atom_to_binary(?FUNCTION_NAME),
QName = <<"my queue">>,
RoutingKey = <<"my routing key">>,
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}),

#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
exchange = XName,
routing_key = RoutingKey}),
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = <<"/exchange/", XName/binary, "/some-routing-key">>,
Address = <<"/exchange/", XName/binary, "/", RoutingKey/binary>>,
{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, Address),
ok = wait_for_credit(Sender),
?assertMatch(#{publishers := 1}, get_global_counters(Config)),

DTag1 = <<1>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)),
ok = wait_for_settlement(DTag1),

%% Server closes the link endpoint due to some AMQP 1.0 external condition:
%% In this test, the external condition is that an AMQP 0.9.1 client deletes the exchange.
#'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = XName}),
ok = rabbit_ct_client_helpers:close_channel(Ch),

%% When we publish the next message, we expect:
%% 1. that the message is released because the exchange doesn't exist anymore, and
DTag = <<255>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<"body">>, false)),
ok = wait_for_settlement(DTag, released),
DTag2 = <<255>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)),
ok = wait_for_settlement(DTag2, released),
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED},
receive {amqp10_event, {link, Sender, {detached, ExpectedError}}} -> ok
after 5000 -> ct:fail("server did not close our outgoing link")
end,
?assertMatch(#{publishers := 0}, get_global_counters(Config)),

#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

Expand Down Expand Up @@ -3351,7 +3361,7 @@ receive_all_messages0(Receiver, Accept, Acc) ->
false -> ok
end,
receive_all_messages0(Receiver, Accept, [Msg | Acc])
after 500 ->
after 1000 ->
lists:reverse(Acc)
end.

Expand Down
Loading