Skip to content

Commit

Permalink
Store incoming max_message_size in #incoming_link{}
Browse files Browse the repository at this point in the history
This keeps functions pure and ensures that existing links do not break
if an operator were to dynamically change the server's max_message_size.

Each link now has a max_message_size:
* incoming links as determined by RabbitMQ config
* outgoing links as determined by the client
  • Loading branch information
ansd committed Aug 8, 2024
1 parent 3e708bc commit 28bd6d4
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
routing_key :: rabbit_types:routing_key() | to | subject,
%% queue_name_bin is only set if the link target address refers to a queue.
queue_name_bin :: undefined | rabbit_misc:resource_name(),
max_message_size :: pos_integer(),
delivery_count :: sequence_no(),
credit :: rabbit_queue_type:credit(),
%% TRANSFER delivery IDs published to queues but not yet confirmed by queues
Expand Down Expand Up @@ -999,10 +1000,12 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
ok = validate_attach(Attach),
case ensure_target(Target, Vhost, User, PermCache0) of
{ok, Exchange, RoutingKey, QNameBin, PermCache} ->
MaxMessageSize = persistent_term:get(max_message_size),
IncomingLink = #incoming_link{
exchange = Exchange,
routing_key = RoutingKey,
queue_name_bin = QNameBin,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCountInt,
credit = MaxLinkCredit},
_Outcomes = outcomes(Source),
Expand All @@ -1015,7 +1018,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
target = Target,
%% We are the receiver.
role = ?AMQP_ROLE_RECEIVER,
max_message_size = {ulong, persistent_term:get(max_message_size)}},
max_message_size = {ulong, MaxMessageSize}},
Flow = #'v1_0.flow'{handle = Handle,
delivery_count = DeliveryCount,
link_credit = ?UINT(MaxLinkCredit)},
Expand Down Expand Up @@ -2248,6 +2251,7 @@ incoming_link_transfer(
settled = Settled},
MsgPart,
Link0 = #incoming_link{
max_message_size = MaxMessageSize,
multi_transfer_msg = Multi = #multi_transfer_msg{
payload_fragments_rev = PFR0,
delivery_id = FirstDeliveryId,
Expand All @@ -2257,7 +2261,7 @@ incoming_link_transfer(
validate_multi_transfer_delivery_id(DeliveryId, FirstDeliveryId),
validate_multi_transfer_settled(Settled, FirstSettled),
PFR = [MsgPart | PFR0],
validate_incoming_message_size(PFR),
validate_message_size(PFR, MaxMessageSize),
Link = Link0#incoming_link{multi_transfer_msg = Multi#multi_transfer_msg{payload_fragments_rev = PFR}},
{ok, [], Link, State};
incoming_link_transfer(
Expand All @@ -2277,6 +2281,7 @@ incoming_link_transfer(
MsgPart,
#incoming_link{exchange = LinkExchange,
routing_key = LinkRKey,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount0,
incoming_unconfirmed_map = U0,
credit = Credit0,
Expand Down Expand Up @@ -2306,7 +2311,7 @@ incoming_link_transfer(
{MsgBin0, FirstDeliveryId, FirstSettled}
end,
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
validate_incoming_message_size(PayloadBin),
validate_message_size(PayloadBin, MaxMessageSize),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
Expand Down Expand Up @@ -3034,9 +3039,6 @@ validate_transfer_rcv_settle_mode(?V_1_0_RECEIVER_SETTLE_MODE_SECOND, _Settled =
validate_transfer_rcv_settle_mode(_, _) ->
ok.

validate_incoming_message_size(Message) ->
validate_message_size(Message, persistent_term:get(max_message_size)).

validate_message_size(_, unlimited) ->
ok;
validate_message_size(Message, MaxMsgSize)
Expand All @@ -3050,7 +3052,7 @@ validate_message_size(Message, MaxMsgSize)
%% We apply that sentence to both incoming messages that are too large for us and outgoing messages that are
%% too large for the client.
%% This is an interesting protocol difference to MQTT where we instead discard outgoing messages that are too
%% large to send then behave as if we had completed sending that message [MQTT 5.0, MQTT-3.1.2-25].
%% large to send and then behave as if we had completed sending that message [MQTT 5.0, MQTT-3.1.2-25].
protocol_error(
?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED,
"message size (~b bytes) > maximum message size (~b bytes)",
Expand Down

0 comments on commit 28bd6d4

Please sign in to comment.