diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index ed277a711d0..6b417146915 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -118,6 +118,13 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set subList, boolean isNotifyConsumerIdsChangedEnable) { + return registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, + isNotifyConsumerIdsChangedEnable, true); + } + + public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, + ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, + final Set subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) { long start = System.currentTimeMillis(); ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { @@ -131,7 +138,10 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); - boolean r2 = consumerGroupInfo.updateSubscription(subList); + boolean r2 = false; + if (updateSubscription) { + r2 = consumerGroupInfo.updateSubscription(subList); + } if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java index 61b3fd04e61..1f58e70661b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java @@ -105,7 +105,7 @@ public CompletableFuture heartbeat(ProxyContext ctx, Heartbea case SIMPLE_CONSUMER: { validateConsumerGroup(request.getGroup()); String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()); - this.registerConsumer(ctx, consumerGroup, clientSettings.getClientType(), clientSettings.getSubscription().getSubscriptionsList()); + this.registerConsumer(ctx, consumerGroup, clientSettings.getClientType(), clientSettings.getSubscription().getSubscriptionsList(), false); break; } default: { @@ -227,7 +227,7 @@ protected TelemetryCommand processClientSettings(ProxyContext ctx, TelemetryComm if (settings.hasSubscription()) { validateConsumerGroup(settings.getSubscription().getGroup()); String groupName = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup()); - GrpcClientChannel consumerChannel = registerConsumer(ctx, groupName, settings.getClientType(), settings.getSubscription().getSubscriptionsList()); + GrpcClientChannel consumerChannel = registerConsumer(ctx, groupName, settings.getClientType(), settings.getSubscription().getSubscriptionsList(), true); consumerChannel.setClientObserver(responseObserver); } @@ -251,7 +251,7 @@ protected GrpcClientChannel registerProducer(ProxyContext ctx, String topicName) return channel; } - protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType, List subscriptionEntryList) { + protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType, List subscriptionEntryList, boolean updateSubscription) { String clientId = ctx.getClientID(); LanguageCode languageCode = LanguageCode.valueOf(ctx.getLanguage()); @@ -265,7 +265,8 @@ protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGr this.buildConsumeType(clientType), MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET, - this.buildSubscriptionDataSet(subscriptionEntryList) + this.buildSubscriptionDataSet(subscriptionEntryList), + updateSubscription ); return channel; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java index f56627a2570..9225289822e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java @@ -71,7 +71,8 @@ public void registerConsumer( ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, - Set subList + Set subList, + boolean updateSubscription ) { this.serviceManager.getConsumerManager().registerConsumer( consumerGroup, @@ -80,7 +81,8 @@ public void registerConsumer( messageModel, consumeFromWhere, subList, - false); + false, + updateSubscription); } public ClientChannelInfo findConsumerChannel( diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 23dd3c86a5f..5234237a22e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -241,8 +241,8 @@ public void registerProducerListener(ProducerChangeListener producerChangeListen @Override public void registerConsumer(ProxyContext ctx, String consumerGroup, ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, - Set subList) { - this.clientProcessor.registerConsumer(ctx, consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList); + Set subList, boolean updateSubscription) { + this.clientProcessor.registerConsumer(ctx, consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, updateSubscription); } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index 35015b6aeec..e0ae7147100 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -259,7 +259,8 @@ void registerConsumer( ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, - Set subList + Set subList, + boolean updateSubscription ); ClientChannelInfo findConsumerChannel(