Skip to content

Commit

Permalink
[ISSUE apache#3949] Add updateSubscription for registerConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma committed Jul 19, 2022
1 parent 77df4ce commit 852e954
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
return registerConsumer(group, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList,
isNotifyConsumerIdsChangedEnable, true);
}

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable, boolean updateSubscription) {
long start = System.currentTimeMillis();
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
Expand All @@ -131,7 +138,10 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
boolean r2 = false;
if (updateSubscription) {
r2 = consumerGroupInfo.updateSubscription(subList);
}

if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public CompletableFuture<HeartbeatResponse> heartbeat(ProxyContext ctx, Heartbea
case SIMPLE_CONSUMER: {
validateConsumerGroup(request.getGroup());
String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
this.registerConsumer(ctx, consumerGroup, clientSettings.getClientType(), clientSettings.getSubscription().getSubscriptionsList());
this.registerConsumer(ctx, consumerGroup, clientSettings.getClientType(), clientSettings.getSubscription().getSubscriptionsList(), false);
break;
}
default: {
Expand Down Expand Up @@ -227,7 +227,7 @@ protected TelemetryCommand processClientSettings(ProxyContext ctx, TelemetryComm
if (settings.hasSubscription()) {
validateConsumerGroup(settings.getSubscription().getGroup());
String groupName = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup());
GrpcClientChannel consumerChannel = registerConsumer(ctx, groupName, settings.getClientType(), settings.getSubscription().getSubscriptionsList());
GrpcClientChannel consumerChannel = registerConsumer(ctx, groupName, settings.getClientType(), settings.getSubscription().getSubscriptionsList(), true);
consumerChannel.setClientObserver(responseObserver);
}

Expand All @@ -251,7 +251,7 @@ protected GrpcClientChannel registerProducer(ProxyContext ctx, String topicName)
return channel;
}

protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType, List<SubscriptionEntry> subscriptionEntryList) {
protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGroup, ClientType clientType, List<SubscriptionEntry> subscriptionEntryList, boolean updateSubscription) {
String clientId = ctx.getClientID();
LanguageCode languageCode = LanguageCode.valueOf(ctx.getLanguage());

Expand All @@ -265,7 +265,8 @@ protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGr
this.buildConsumeType(clientType),
MessageModel.CLUSTERING,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,
this.buildSubscriptionDataSet(subscriptionEntryList)
this.buildSubscriptionDataSet(subscriptionEntryList),
updateSubscription
);
return channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void registerConsumer(
ConsumeType consumeType,
MessageModel messageModel,
ConsumeFromWhere consumeFromWhere,
Set<SubscriptionData> subList
Set<SubscriptionData> subList,
boolean updateSubscription
) {
this.serviceManager.getConsumerManager().registerConsumer(
consumerGroup,
Expand All @@ -80,7 +81,8 @@ public void registerConsumer(
messageModel,
consumeFromWhere,
subList,
false);
false,
updateSubscription);
}

public ClientChannelInfo findConsumerChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ public void registerProducerListener(ProducerChangeListener producerChangeListen
@Override
public void registerConsumer(ProxyContext ctx, String consumerGroup, ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
Set<SubscriptionData> subList) {
this.clientProcessor.registerConsumer(ctx, consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList);
Set<SubscriptionData> subList, boolean updateSubscription) {
this.clientProcessor.registerConsumer(ctx, consumerGroup, clientChannelInfo, consumeType, messageModel, consumeFromWhere, subList, updateSubscription);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ void registerConsumer(
ConsumeType consumeType,
MessageModel messageModel,
ConsumeFromWhere consumeFromWhere,
Set<SubscriptionData> subList
Set<SubscriptionData> subList,
boolean updateSubscription
);

ClientChannelInfo findConsumerChannel(
Expand Down

0 comments on commit 852e954

Please sign in to comment.