Skip to content

Commit

Permalink
Remove availability of CQv1
Browse files Browse the repository at this point in the history
We reject CQv1 in rabbit.schema as well.

Most of the v1 code is still around as it is needed
for conversion to v2. It will be removed at a later
time when conversion is no longer supported.

We don't shard the CQ property suite anymore:
there's only 1 case remaining.
  • Loading branch information
lhoguin committed May 13, 2024
1 parent d180474 commit ecf4600
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 823 deletions.
2 changes: 0 additions & 2 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "classic_queue_prop_SUITE",
size = "large",
shard_count = 6,
sharding_method = "case",
deps = [
"@proper//:erlang_app",
],
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2538,8 +2538,8 @@ end}.

{translation, "rabbit.classic_queue_default_version",
fun(Conf) ->
case cuttlefish:conf_get("classic_queue.default_version", Conf, 1) of
1 -> 1;
case cuttlefish:conf_get("classic_queue.default_version", Conf, 2) of
1 -> cuttlefish:invalid("Classic queues v1 are no longer supported");
2 -> 2;
_ -> cuttlefish:unset()
end
Expand Down
6 changes: 1 addition & 5 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,8 @@ init_queue_mode(Mode, State = #q {backing_queue = BQ,

init_queue_version(Version0, State = #q {backing_queue = BQ,
backing_queue_state = BQS}) ->
%% When the version is undefined we use the default version 1.
%% We want to BQ:set_queue_version in all cases because a v2
%% policy might have been deleted, for example, and we want
%% the queue to go back to v1.
Version = case Version0 of
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1);
undefined -> 2;
_ -> Version0
end,
BQS1 = BQ:set_queue_version(Version, BQS),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ sync(State0 = #qi{ confirms = Confirms,
end,
State#qi{ confirms = sets:new([{version,2}]) }.

-spec needs_sync(state()) -> 'false'.
-spec needs_sync(state()) -> 'false' | 'confirms'.

needs_sync(State = #qi{ confirms = Confirms }) ->
?DEBUG("~0p", [State]),
Expand Down
89 changes: 7 additions & 82 deletions deps/rabbit/src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncM
'undefined' | non_neg_integer(), qistate()}.

recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
ContainsCheckFun, OnSyncFun, OnSyncMsgFun, Context) ->
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
%% We only allow using this module when converting to v2.
convert) ->
#{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
put(segment_entry_count, SegmentEntryCount),
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
Expand All @@ -323,10 +325,10 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> case proplists:get_value(segments, Terms, non_clean_shutdown) of
non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1, Context);
non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1);
RecoveredCounts -> init_clean(RecoveredCounts, State1)
end;
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1, Context)
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.

-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate().
Expand Down Expand Up @@ -644,7 +646,7 @@ init_clean(RecoveredCounts, State) ->
-define(RECOVER_BYTES, 2).
-define(RECOVER_COUNTER_SIZE, 2).

init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Recover the journal completely. This will also load segments
%% which have entries in the journal and remove duplicates. The
%% counts will correctly reflect the combination of the segment
Expand Down Expand Up @@ -679,84 +681,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
%% recovery fails with a crash.
State2 = flush_journal(State1 #qistate { segments = Segments1,
dirty_count = DirtyCount }),
case Context of
convert ->
{Count, Bytes, State2};
main ->
%% We try to see if there are segment files from the v2 index.
case rabbit_file:wildcard(".*\\.qi", Dir) of
%% We are recovering a dirty queue that was using the v2 index or in
%% the process of converting from v2 to v1.
[_|_] ->
#resource{virtual_host = VHost, name = QName} = State2#qistate.queue_name,
rabbit_log:info("Queue ~ts in vhost ~ts recovered ~b total messages before resuming convert",
[QName, VHost, Count]),
CountersRef = counters:new(?RECOVER_COUNTER_SIZE, []),
State3 = recover_index_v2_dirty(State2, ContainsCheckFun, CountersRef),
{Count + counters:get(CountersRef, ?RECOVER_COUNT),
Bytes + counters:get(CountersRef, ?RECOVER_BYTES),
State3};
%% Otherwise keep default values.
[] ->
{Count, Bytes, State2}
end
end.

recover_index_v2_dirty(State0 = #qistate { queue_name = Name,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun },
ContainsCheckFun, CountersRef) ->
#resource{virtual_host = VHost, name = QName} = Name,
rabbit_log:info("Converting queue ~ts in vhost ~ts from v2 to v1 after unclean shutdown", [QName, VHost]),
%% We cannot use the counts/bytes because some messages may be in both
%% the v1 and v2 indexes after a crash.
{_, _, V2State} = rabbit_classic_queue_index_v2:recover(Name, non_clean_shutdown, true,
ContainsCheckFun, OnSyncFun, OnSyncMsgFun,
convert),
State = recover_index_v2_common(State0, V2State, CountersRef),
rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1",
[QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]),
State.

%% At this point all messages are persistent because transient messages
%% were dropped during the v2 index recovery.
recover_index_v2_common(State0 = #qistate { queue_name = Name, dir = Dir },
V2State, CountersRef) ->
%% Use a temporary per-queue store state to read embedded messages.
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
%% Go through the v2 index and publish messages to v1 index.
{LoSeqId, HiSeqId, _} = rabbit_classic_queue_index_v2:bounds(V2State),
%% When resuming after a crash we need to double check the messages that are both
%% in the v1 and v2 index (effectively the messages below the upper bound of the
%% v1 index that are about to be written to it).
{_, V1HiSeqId, _} = bounds(State0),
SkipFun = fun
(SeqId, FunState0) when SeqId < V1HiSeqId ->
case read(SeqId, SeqId + 1, FunState0) of
%% Message already exists, skip.
{[_], FunState} ->
{skip, FunState};
%% Message doesn't exist, write.
{[], FunState} ->
{write, FunState}
end;
%% Message is out of bounds of the v1 index.
(_, FunState) ->
{write, FunState}
end,
%% We use a common function also used with conversion on policy change.
{State1, _StoreState} = rabbit_variable_queue:convert_from_v2_to_v1_loop(Name, State0, V2State, StoreState0,
{CountersRef, ?RECOVER_COUNT, ?RECOVER_BYTES},
LoSeqId, HiSeqId, SkipFun),
%% Delete any remaining v2 index files.
OldFiles = rabbit_file:wildcard(".*\\.qi", Dir)
++ rabbit_file:wildcard(".*\\.qs", Dir),
_ = [rabbit_file:delete(filename:join(Dir, F)) || F <- OldFiles],
%% Ensure that everything in the v1 index is written to disk.
State = flush(State1),
%% Clean up all the garbage that we have surely been creating.
garbage_collect(),
State.
{Count, Bytes, State2}.

terminate(State = #qistate { journal_handle = JournalHdl,
segments = Segments }) ->
Expand Down
Loading

0 comments on commit ecf4600

Please sign in to comment.