Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #6681] fix: fix pop retry message notification #6682

Closed
wants to merge 1 commit into from

Conversation

HScarb
Copy link
Contributor

@HScarb HScarb commented May 2, 2023

Which Issue(s) This PR Fixes

Fixes #6681

Brief Description

After ReputMessageService dispatch a message from a pop retry topic, notify all consumers who subscribe to any queue of this topic.

How Did You Test This Change?

  1. Create a topic TopicTest.
  2. Start a pop push consumer using the example code PopConsumer.java to subscribe TopicTest, set popBatchSize to 1, and when receiving a message, print it and return RECONSUME_LATER.
  3. Publish a message to TopicTest.
  4. Wait and see the consumer's output

The consumer consume retry message immediately

@xdkxlk
Copy link
Contributor

xdkxlk commented May 4, 2023

Pop retry message has already been notified in PopReviveService#reviveRetry

brokerController.getPopMessageProcessor().notifyMessageArriving(
    KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
    popCheckPoint.getCId(),
    -1
);
brokerController.getNotificationProcessor().notifyMessageArriving(
    KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);

@HScarb
Copy link
Contributor Author

HScarb commented May 5, 2023

Pop retry message has already been notified in PopReviveService#reviveRetry

brokerController.getPopMessageProcessor().notifyMessageArriving(
    KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
    popCheckPoint.getCId(),
    -1
);
brokerController.getNotificationProcessor().notifyMessageArriving(
    KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);

In PopReviveService#reviveRetry, it only notifies consumers who subscribe for all queues (queue ID is -1).

But in the case which popShareQueueNum >= consumerNum - 1, the consumer will only subscribe to some of the queues.

public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
final String consumerGroup, final String clientId, List<MessageQueue> mqAll, List<String> cidAll,
int popShareQueueNum) {
List<MessageQueue> allocateResult;
if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) {
//each client pop all messagequeue
allocateResult = new ArrayList<>(mqAll.size());
for (MessageQueue mq : mqAll) {
//must create new MessageQueue in case of change cache in AssignmentManager
MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
allocateResult.add(newMq);
}
} else {
if (cidAll.size() <= mqAll.size()) {
//consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
int index = cidAll.indexOf(clientId);
if (index >= 0) {
for (int i = 1; i <= popShareQueueNum; i++) {
index++;
index = index % cidAll.size();
List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
allocateResult.addAll(tmp);
}
}
} else {
//make sure each cid is assigned
allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
}
}
return allocateResult;
}

In this case, pollingMap only contains polling requests of specific queues, and notifying -1 queueID will not wake up the polling requests.
image
The polling requests will finally be waked up until they reach the expiration time.

@xdkxlk
Copy link
Contributor

xdkxlk commented May 6, 2023

Pop retry message has already been notified in PopReviveService#reviveRetry

brokerController.getPopMessageProcessor().notifyMessageArriving(
    KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
    popCheckPoint.getCId(),
    -1
);
brokerController.getNotificationProcessor().notifyMessageArriving(
    KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);

In PopReviveService#reviveRetry, it only notifies consumers who subscribe for all queues (queue ID is -1).

But in the case which popShareQueueNum >= consumerNum - 1, the consumer will only subscribe to some of the queues.

public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
final String consumerGroup, final String clientId, List<MessageQueue> mqAll, List<String> cidAll,
int popShareQueueNum) {
List<MessageQueue> allocateResult;
if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) {
//each client pop all messagequeue
allocateResult = new ArrayList<>(mqAll.size());
for (MessageQueue mq : mqAll) {
//must create new MessageQueue in case of change cache in AssignmentManager
MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
allocateResult.add(newMq);
}
} else {
if (cidAll.size() <= mqAll.size()) {
//consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
int index = cidAll.indexOf(clientId);
if (index >= 0) {
for (int i = 1; i <= popShareQueueNum; i++) {
index++;
index = index % cidAll.size();
List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
allocateResult.addAll(tmp);
}
}
} else {
//make sure each cid is assigned
allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
}
}
return allocateResult;
}

In this case, pollingMap only contains polling requests of specific queues, and notifying -1 queueID will not wake up the polling requests.
image
The polling requests will finally be waked up until they reach the expiration time.

Got it

.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
final String popRetryTopic
= KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup());
topicCidMap.computeIfAbsent(popRetryTopic, k -> new ConcurrentHashMap<>())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need put popRetryTopic into topicCidMap? In notifyRetryMessageArriving, the topic has been changed to normal topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because notifyMessageArriving(final String toic, final int queueId) is called by ReputMessageService, the topic won't be changed to normal topic.
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I didn't notice notifyMessageArriving is called in PopReviveService#reviveRetry.
It looks like it would be more appropriate to call notifyRetryMessageArriving in PopReviveService#reviveRetry.

@codecov-commenter
Copy link

Codecov Report

Merging #6682 (b823d35) into develop (bee5077) will decrease coverage by 0.03%.
The diff coverage is 7.40%.

@@              Coverage Diff              @@
##             develop    #6682      +/-   ##
=============================================
- Coverage      42.65%   42.62%   -0.03%     
+ Complexity      9093     9086       -7     
=============================================
  Files           1121     1121              
  Lines          79775    79795      +20     
  Branches       10409    10415       +6     
=============================================
- Hits           34027    34015      -12     
- Misses         41462    41500      +38     
+ Partials        4286     4280       -6     
Impacted Files Coverage Δ
...rocketmq/broker/processor/PopMessageProcessor.java 43.82% <0.00%> (-0.71%) ⬇️
...he/rocketmq/broker/processor/PopReviveService.java 36.29% <0.00%> (-0.18%) ⬇️
...in/java/org/apache/rocketmq/common/KeyBuilder.java 0.00% <0.00%> (ø)
...etmq/broker/longpolling/PopLongPollingService.java 17.15% <8.69%> (-2.53%) ⬇️

... and 31 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@HScarb
Copy link
Contributor Author

HScarb commented Jun 7, 2023

@xdkxlk PTAL

@HScarb HScarb requested a review from xdkxlk June 7, 2023 02:36
Copy link

This PR is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR.

@github-actions github-actions bot added the stale label Jul 20, 2024
Copy link

This PR was closed because it has been inactive for 3 days since being marked as stale.

@github-actions github-actions bot closed this Jul 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

When pop consuming, it will not notify polling pop requests when new messages arrive at the retry topic
3 participants