diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/ContextStreamObserver.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/ContextStreamObserver.java new file mode 100644 index 00000000000..c186bfb61cd --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/ContextStreamObserver.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.grpc.v2; + +import org.apache.rocketmq.proxy.common.ProxyContext; + +public interface ContextStreamObserver { + + void onNext(ProxyContext ctx, V value); + + void onError(Throwable t); + + void onCompleted(); +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java index 9d49e0e2caa..73b764bc4fb 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java @@ -150,8 +150,7 @@ public CompletableFuture changeInvisibleDuratio } @Override - public StreamObserver telemetry(ProxyContext ctx, - StreamObserver responseObserver) { - return this.clientActivity.telemetry(ctx, responseObserver); + public ContextStreamObserver telemetry(StreamObserver responseObserver) { + return this.clientActivity.telemetry(responseObserver); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java index 32395322a39..2cb395ad603 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java @@ -378,17 +378,17 @@ public void changeInvisibleDuration(ChangeInvisibleDurationRequest request, @Override public StreamObserver telemetry(StreamObserver responseObserver) { Function statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build(); - ProxyContext context = createContext(); - StreamObserver responseTelemetryCommand = grpcMessingActivity.telemetry(context, responseObserver); + ContextStreamObserver responseTelemetryCommand = grpcMessingActivity.telemetry(responseObserver); return new StreamObserver() { @Override public void onNext(TelemetryCommand value) { + ProxyContext context = createContext(); try { validateContext(context); addExecutor(clientManagerThreadPoolExecutor, context, value, - () -> responseTelemetryCommand.onNext(value), + () -> responseTelemetryCommand.onNext(context, value), responseObserver, statusResponseCreator); } catch (Throwable t) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java index 8f1db82307a..77bd3a88f9d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java @@ -69,5 +69,5 @@ CompletableFuture notifyClientTermination(Proxy CompletableFuture changeInvisibleDuration(ProxyContext ctx, ChangeInvisibleDurationRequest request); - StreamObserver telemetry(ProxyContext ctx, StreamObserver responseObserver); + ContextStreamObserver telemetry(StreamObserver responseObserver); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java index a60228eb9f8..8553289498b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.channel.ChannelHelper; import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity; +import org.apache.rocketmq.proxy.grpc.v2.ContextStreamObserver; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; @@ -174,11 +175,10 @@ public CompletableFuture notifyClientTerminatio return future; } - public StreamObserver telemetry(ProxyContext ctx, - StreamObserver responseObserver) { - return new StreamObserver() { + public ContextStreamObserver telemetry(StreamObserver responseObserver) { + return new ContextStreamObserver() { @Override - public void onNext(TelemetryCommand request) { + public void onNext(ProxyContext ctx, TelemetryCommand request) { try { switch (request.getCommandCase()) { case SETTINGS: { @@ -271,7 +271,7 @@ protected void processAndWriteClientSettings(ProxyContext ctx, TelemetryCommand protected TelemetryCommand processClientSettings(ProxyContext ctx, TelemetryCommand request) { String clientId = ctx.getClientID(); - grpcClientSettingsManager.updateClientSettings(clientId, request.getSettings()); + grpcClientSettingsManager.updateClientSettings(ctx, clientId, request.getSettings()); Settings settings = grpcClientSettingsManager.getClientSettings(ctx); return TelemetryCommand.newBuilder() .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name())) @@ -458,7 +458,11 @@ protected void processClientRegister(String group, Object... args) { if (settings == null) { return; } - grpcClientSettingsManager.updateClientSettings(clientChannelInfo.getClientId(), settings); + grpcClientSettingsManager.updateClientSettings( + ProxyContext.createForInner(this.getClass()), + clientChannelInfo.getClientId(), + settings + ); } } } @@ -475,7 +479,7 @@ protected class ProducerChangeListenerImpl implements ProducerChangeListener { public void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo) { if (event == ProducerGroupEvent.CLIENT_UNREGISTER) { grpcChannelManager.removeChannel(clientChannelInfo.getClientId()); - grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId()); + grpcClientSettingsManager.removeAndGetRawClientSettings(clientChannelInfo.getClientId()); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java index af8b4546e1e..1eff659392e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java @@ -33,15 +33,14 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.MetricCollectorMode; import org.apache.rocketmq.proxy.config.ProxyConfig; @@ -68,7 +67,7 @@ public Settings getRawClientSettings(String clientId) { public Settings getClientSettings(ProxyContext ctx) { String clientId = ctx.getClientID(); - Settings settings = CLIENT_SETTINGS_MAP.get(clientId); + Settings settings = getRawClientSettings(clientId); if (settings == null) { return null; } @@ -182,7 +181,7 @@ protected static CustomizedBackoff convertToCustomizedRetryPolicy(CustomizedRetr .build(); } - public void updateClientSettings(String clientId, Settings settings) { + public void updateClientSettings(ProxyContext ctx, String clientId, Settings settings) { if (settings.hasSubscription()) { settings = createDefaultConsumerSettingsBuilder().mergeFrom(settings).build(); } @@ -194,17 +193,13 @@ protected Settings.Builder createDefaultConsumerSettingsBuilder() { .toBuilder(); } - public void removeClientSettings(String clientId) { - CLIENT_SETTINGS_MAP.remove(clientId); - } - - public void computeIfPresent(String clientId, Function function) { - CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, value) -> function.apply(value)); + public Settings removeAndGetRawClientSettings(String clientId) { + return CLIENT_SETTINGS_MAP.remove(clientId); } public Settings removeAndGetClientSettings(ProxyContext ctx) { String clientId = ctx.getClientID(); - Settings settings = CLIENT_SETTINGS_MAP.remove(clientId); + Settings settings = this.removeAndGetRawClientSettings(clientId); if (settings == null) { return null; } @@ -237,7 +232,10 @@ protected void onWaitEnd() { return settings; } String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup()); - ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo(consumerGroup); + ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo( + ProxyContext.createForInner(this.getClass()), + consumerGroup + ); if (consumerGroupInfo == null || consumerGroupInfo.findChannel(clientId) == null) { log.info("remove unused grpc client settings. group:{}, settings:{}", consumerGroupInfo, settings); return null; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java index 8fb6eaf7df6..eeb9bf87e67 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java @@ -110,7 +110,7 @@ public void registerConsumerIdsChangeListener(ConsumerIdsChangeListener listener this.serviceManager.getConsumerManager().appendConsumerIdsChangeListener(listener); } - public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) { + public ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String consumerGroup) { return this.serviceManager.getConsumerManager().getConsumerGroupInfo(consumerGroup); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 72ff9b939d0..e663ae1ba25 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -290,8 +290,8 @@ public void doChannelCloseEvent(String remoteAddr, Channel channel) { } @Override - public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) { - return this.clientProcessor.getConsumerGroupInfo(consumerGroup); + public ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String consumerGroup) { + return this.clientProcessor.getConsumerGroupInfo(ctx, consumerGroup); } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index 40ffb96a7a2..263068965a0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -288,7 +288,7 @@ void registerConsumerListener( void doChannelCloseEvent(String remoteAddr, Channel channel); - ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup); + ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String consumerGroup); void addTransactionSubscription( ProxyContext ctx, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java index 69280fb8645..1eb81ce9276 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java @@ -80,7 +80,7 @@ protected RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand r for (ProducerData data : heartbeatData.getProducerDataSet()) { ClientChannelInfo clientChannelInfo = new ClientChannelInfo( - this.remotingChannelManager.createProducerChannel(ctx.channel(), data.getGroupName(), clientId), + this.remotingChannelManager.createProducerChannel(context, ctx.channel(), data.getGroupName(), clientId), clientId, request.getLanguage(), request.getVersion()); setClientPropertiesToChannelAttr(clientChannelInfo); @@ -89,7 +89,7 @@ protected RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand r for (ConsumerData data : heartbeatData.getConsumerDataSet()) { ClientChannelInfo clientChannelInfo = new ClientChannelInfo( - this.remotingChannelManager.createConsumerChannel(ctx.channel(), data.getGroupName(), clientId, data.getSubscriptionDataSet()), + this.remotingChannelManager.createConsumerChannel(context, ctx.channel(), data.getGroupName(), clientId, data.getSubscriptionDataSet()), clientId, request.getLanguage(), request.getVersion()); setClientPropertiesToChannelAttr(clientChannelInfo); @@ -122,7 +122,7 @@ protected RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCo (UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class); final String producerGroup = requestHeader.getProducerGroup(); if (producerGroup != null) { - RemotingChannel channel = this.remotingChannelManager.removeProducerChannel(producerGroup, ctx.channel()); + RemotingChannel channel = this.remotingChannelManager.removeProducerChannel(context, producerGroup, ctx.channel()); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( channel, requestHeader.getClientID(), @@ -132,7 +132,7 @@ protected RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCo } final String consumerGroup = requestHeader.getConsumerGroup(); if (consumerGroup != null) { - RemotingChannel channel = this.remotingChannelManager.removeConsumerChannel(consumerGroup, ctx.channel()); + RemotingChannel channel = this.remotingChannelManager.removeConsumerChannel(context, consumerGroup, ctx.channel()); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( channel, requestHeader.getClientID(), @@ -170,7 +170,7 @@ public void handle(ConsumerGroupEvent event, String group, Object... args) { } if (args[0] instanceof ClientChannelInfo) { ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0]; - remotingChannelManager.removeConsumerChannel(group, clientChannelInfo.getChannel()); + remotingChannelManager.removeConsumerChannel(ProxyContext.createForInner(this.getClass()), group, clientChannelInfo.getChannel()); log.info("remove remoting channel when client unregister. clientChannelInfo:{}", clientChannelInfo); } } @@ -187,7 +187,7 @@ protected class ProducerChangeListenerImpl implements ProducerChangeListener { @Override public void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo) { if (event == ProducerGroupEvent.CLIENT_UNREGISTER) { - remotingChannelManager.removeProducerChannel(group, clientChannelInfo.getChannel()); + remotingChannelManager.removeProducerChannel(ProxyContext.createForInner(this.getClass()), group, clientChannelInfo.getChannel()); } } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java index e9d42afc2c9..b21b4afa42d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java @@ -83,7 +83,7 @@ protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, Remo ProxyContext context) throws Exception { RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class); GetConsumerListByGroupRequestHeader header = (GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); - ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup()); + ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup()); List clientIds = consumerGroupInfo.getAllClientId(); GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody(); body.setConsumerIdList(clientIds); @@ -96,7 +96,7 @@ protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, R ProxyContext context) throws Exception { RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerConnectionListRequestHeader.class); GetConsumerConnectionListRequestHeader header = (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); - ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup()); + ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup()); if (consumerGroupInfo != null) { ConsumerConnection bodydata = new ConsumerConnection(); bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere()); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java index d548ddc0dfc..3324c231ab4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java @@ -41,7 +41,7 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); int sysFlag = requestHeader.getSysFlag(); if (!PullSysFlag.hasSubscriptionFlag(sysFlag)) { - ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup()); + ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(context, requestHeader.getConsumerGroup()); if (consumerInfo == null) { return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST, "the consumer's subscription not latest"); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java index 133865f48bd..211c3c9275a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.common.utils.StartAndShutdown; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -57,11 +58,11 @@ protected String buildKey(String prefix, String group) { return prefix + group; } - public RemotingChannel createProducerChannel(Channel channel, String group, String clientId) { + public RemotingChannel createProducerChannel(ProxyContext ctx, Channel channel, String group, String clientId) { return createChannel(channel, buildProducerKey(group), clientId, Collections.emptySet()); } - public RemotingChannel createConsumerChannel(Channel channel, String group, String clientId, Set subscriptionData) { + public RemotingChannel createConsumerChannel(ProxyContext ctx, Channel channel, String group, String clientId, Set subscriptionData) { return createChannel(channel, buildConsumerKey(group), clientId, subscriptionData); } @@ -96,11 +97,11 @@ public Set removeChannel(Channel channel) { return removedChannelSet; } - public RemotingChannel removeProducerChannel(String group, Channel channel) { + public RemotingChannel removeProducerChannel(ProxyContext ctx, String group, Channel channel) { return removeChannel(buildProducerKey(group), channel); } - public RemotingChannel removeConsumerChannel(String group, Channel channel) { + public RemotingChannel removeConsumerChannel(ProxyContext ctx, String group, Channel channel) { return removeChannel(buildConsumerKey(group), channel); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index 3fa6414c39b..b6b14faa492 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -26,19 +26,18 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.proxy.common.AbstractCacheLoader; -import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; -import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.checkerframework.checker.nullness.qual.NonNull; @@ -52,8 +51,6 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { protected final LoadingCache topicCache; protected final ScheduledExecutorService scheduledExecutorService; protected final ThreadPoolExecutor cacheRefreshExecutor; - private final TopicRouteCacheLoader topicRouteCacheLoader = new TopicRouteCacheLoader(); - public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { ProxyConfig config = ConfigurationManager.getProxyConfig(); @@ -76,13 +73,8 @@ public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { executor(cacheRefreshExecutor).build(new CacheLoader() { @Override public @Nullable MessageQueueView load(String topic) throws Exception { try { - TopicRouteData topicRouteData = topicRouteCacheLoader.loadTopicRouteData(topic); - if (isTopicRouteValid(topicRouteData)) { - MessageQueueView tmp = new MessageQueueView(topic, topicRouteData); - log.info("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); - return tmp; - } - return MessageQueueView.WRAPPED_EMPTY_QUEUE; + TopicRouteData topicRouteData = mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis()); + return buildMessageQueueView(topic, topicRouteData); } catch (Exception e) { if (TopicRouteHelper.isTopicNotExistError(e)) { return MessageQueueView.WRAPPED_EMPTY_QUEUE; @@ -138,44 +130,12 @@ protected static boolean isTopicRouteValid(TopicRouteData routeData) { && routeData.getBrokerDatas() != null && !routeData.getBrokerDatas().isEmpty(); } - protected abstract class AbstractTopicRouteCacheLoader extends AbstractCacheLoader { - - public AbstractTopicRouteCacheLoader() { - super(cacheRefreshExecutor); - } - - protected abstract TopicRouteData loadTopicRouteData(String topic) throws Exception; - - @Override - public MessageQueueView getDirectly(String topic) throws Exception { - try { - TopicRouteData topicRouteData = loadTopicRouteData(topic); - - if (isTopicRouteValid(topicRouteData)) { - MessageQueueView tmp = new MessageQueueView(topic, topicRouteData); - log.info("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); - return tmp; - } - return MessageQueueView.WRAPPED_EMPTY_QUEUE; - } catch (Exception e) { - if (TopicRouteHelper.isTopicNotExistError(e)) { - return MessageQueueView.WRAPPED_EMPTY_QUEUE; - } - throw e; - } - } - - @Override - protected void onErr(String key, Exception e) { - log.error("load topic route from namesrv failed. topic:{}", key, e); - } - } - - protected class TopicRouteCacheLoader extends AbstractTopicRouteCacheLoader { - - @Override - protected TopicRouteData loadTopicRouteData(String topic) throws Exception { - return mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, Duration.ofSeconds(3).toMillis()); + protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { + if (isTopicRouteValid(topicRouteData)) { + MessageQueueView tmp = new MessageQueueView(topic, topicRouteData); + log.info("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); + return tmp; } + return MessageQueueView.WRAPPED_EMPTY_QUEUE; } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java index a5d4e3c9193..0c1ebcdfae7 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java @@ -43,6 +43,7 @@ import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; +import org.apache.rocketmq.proxy.grpc.v2.ContextStreamObserver; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; @@ -341,7 +342,7 @@ public void testReportThreadStackTrace() { String nonce = "123"; when(grpcChannelManagerMock.getAndRemoveResponseFuture(anyString())).thenReturn((CompletableFuture) runningInfoFutureMock); ProxyContext context = createContext(); - StreamObserver streamObserver = clientActivity.telemetry(context, new StreamObserver() { + ContextStreamObserver streamObserver = clientActivity.telemetry(new StreamObserver() { @Override public void onNext(TelemetryCommand value) { } @@ -354,7 +355,7 @@ public void onError(Throwable t) { public void onCompleted() { } }); - streamObserver.onNext(TelemetryCommand.newBuilder() + streamObserver.onNext(context, TelemetryCommand.newBuilder() .setThreadStackTrace(ThreadStackTrace.newBuilder() .setThreadStackTrace(jstack) .setNonce(nonce) @@ -373,7 +374,7 @@ public void testReportVerifyMessageResult() { String nonce = "123"; when(grpcChannelManagerMock.getAndRemoveResponseFuture(anyString())).thenReturn((CompletableFuture) resultFutureMock); ProxyContext context = createContext(); - StreamObserver streamObserver = clientActivity.telemetry(context, new StreamObserver() { + ContextStreamObserver streamObserver = clientActivity.telemetry(new StreamObserver() { @Override public void onNext(TelemetryCommand value) { } @@ -386,7 +387,7 @@ public void onError(Throwable t) { public void onCompleted() { } }); - streamObserver.onNext(TelemetryCommand.newBuilder() + streamObserver.onNext(context, TelemetryCommand.newBuilder() .setVerifyMessageResult(VerifyMessageResult.newBuilder() .setNonce(nonce) .build()) @@ -418,11 +419,8 @@ public void onCompleted() { } }; - StreamObserver requestObserver = this.clientActivity.telemetry( - ctx, - responseObserver - ); - requestObserver.onNext(TelemetryCommand.newBuilder() + ContextStreamObserver requestObserver = this.clientActivity.telemetry(responseObserver); + requestObserver.onNext(ctx, TelemetryCommand.newBuilder() .setSettings(settings) .build()); return future; diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java index 9044873a6d3..6742f094c82 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java @@ -54,7 +54,7 @@ public void before() throws Throwable { public void testGetProducerData() { ProxyContext context = ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID); - this.grpcClientSettingsManager.updateClientSettings(CLIENT_ID, Settings.newBuilder() + this.grpcClientSettingsManager.updateClientSettings(context, CLIENT_ID, Settings.newBuilder() .setBackoffPolicy(RetryPolicy.getDefaultInstance()) .setPublishing(Publishing.getDefaultInstance()) .build()); @@ -65,18 +65,18 @@ public void testGetProducerData() { @Test public void testGetSubscriptionData() { + ProxyContext context = ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID); + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); when(this.messagingProcessor.getSubscriptionGroupConfig(any(), any())) .thenReturn(subscriptionGroupConfig); - this.grpcClientSettingsManager.updateClientSettings(CLIENT_ID, Settings.newBuilder() + this.grpcClientSettingsManager.updateClientSettings(context, CLIENT_ID, Settings.newBuilder() .setSubscription(Subscription.newBuilder() .setGroup(Resource.newBuilder().setName("group").build()) .build()) .build()); - ProxyContext context = ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID); - Settings settings = this.grpcClientSettingsManager.getClientSettings(context); assertEquals(settings.getBackoffPolicy(), this.grpcClientSettingsManager.createDefaultConsumerSettingsBuilder().build().getBackoffPolicy()); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java index d8ad4518758..a2f1f4cc89f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java @@ -77,7 +77,7 @@ public void setup() throws Exception { @Test public void testPullMessageWithoutSub() throws Exception { - when(messagingProcessorMock.getConsumerGroupInfo(eq(group))) + when(messagingProcessorMock.getConsumerGroupInfo(any(), eq(group))) .thenReturn(consumerGroupInfoMock); SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setSubString(subString); @@ -128,7 +128,7 @@ public void testPullMessageWithoutSub() throws Exception { @Test public void testPullMessageWithSub() throws Exception { - when(messagingProcessorMock.getConsumerGroupInfo(eq(group))) + when(messagingProcessorMock.getConsumerGroupInfo(any(), eq(group))) .thenReturn(consumerGroupInfoMock); SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setSubString(subString); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java index 5a5b441e957..11224059375 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelId; import java.util.HashSet; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient; import org.apache.rocketmq.proxy.service.channel.SimpleChannel; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; @@ -46,6 +47,7 @@ public class RemotingChannelManagerTest { private final String remoteAddress = "10.152.39.53:9768"; private final String localAddress = "11.193.0.1:1210"; private RemotingChannelManager remotingChannelManager; + private final ProxyContext ctx = ProxyContext.createForInner(this.getClass()); @Before public void before() { @@ -58,13 +60,13 @@ public void testCreateChannel() { String clientId = RandomStringUtils.randomAlphabetic(10); Channel producerChannel = createMockChannel(); - RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId); + RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group, clientId); assertNotNull(producerRemotingChannel); - assertSame(producerRemotingChannel, this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId)); + assertSame(producerRemotingChannel, this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group, clientId)); Channel consumerChannel = createMockChannel(); - RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>()); - assertSame(consumerRemotingChannel, this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>())); + RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group, clientId, new HashSet<>()); + assertSame(consumerRemotingChannel, this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group, clientId, new HashSet<>())); assertNotNull(consumerRemotingChannel); assertNotSame(producerRemotingChannel, consumerRemotingChannel); @@ -77,14 +79,14 @@ public void testRemoveProducerChannel() { { Channel producerChannel = createMockChannel(); - RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId); - assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(group, producerRemotingChannel)); + RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group, clientId); + assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(ctx, group, producerRemotingChannel)); assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty()); } { Channel producerChannel = createMockChannel(); - RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, group, clientId); - assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(group, producerChannel)); + RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group, clientId); + assertSame(producerRemotingChannel, this.remotingChannelManager.removeProducerChannel(ctx, group, producerChannel)); assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty()); } } @@ -96,14 +98,14 @@ public void testRemoveConsumerChannel() { { Channel consumerChannel = createMockChannel(); - RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>()); - assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(group, consumerRemotingChannel)); + RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group, clientId, new HashSet<>()); + assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(ctx, group, consumerRemotingChannel)); assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty()); } { Channel consumerChannel = createMockChannel(); - RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, group, clientId, new HashSet<>()); - assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(group, consumerChannel)); + RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group, clientId, new HashSet<>()); + assertSame(consumerRemotingChannel, this.remotingChannelManager.removeConsumerChannel(ctx, group, consumerChannel)); assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty()); } } @@ -115,9 +117,9 @@ public void testRemoveChannel() { String clientId = RandomStringUtils.randomAlphabetic(10); Channel consumerChannel = createMockChannel(); - RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(consumerChannel, consumerGroup, clientId, new HashSet<>()); + RemotingChannel consumerRemotingChannel = this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, consumerGroup, clientId, new HashSet<>()); Channel producerChannel = createMockChannel(); - RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(producerChannel, producerGroup, clientId); + RemotingChannel producerRemotingChannel = this.remotingChannelManager.createProducerChannel(ctx, producerChannel, producerGroup, clientId); assertSame(consumerRemotingChannel, this.remotingChannelManager.removeChannel(consumerChannel).stream().findFirst().get()); assertSame(producerRemotingChannel, this.remotingChannelManager.removeChannel(producerChannel).stream().findFirst().get()); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java index 02912446cfd..6766564bc72 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.protocol.body; +import com.google.common.base.MoreObjects; import java.util.HashSet; import java.util.Set; import org.apache.rocketmq.common.message.MessageQueue; @@ -59,4 +60,14 @@ public Set getMqSet() { public void setMqSet(Set mqSet) { this.mqSet = mqSet; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("consumerGroup", consumerGroup) + .add("clientId", clientId) + .add("onlyThisBroker", onlyThisBroker) + .add("mqSet", mqSet) + .toString(); + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java index fcac7ed9ae9..2ad906739cc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.protocol.body; +import com.google.common.base.MoreObjects; import java.util.HashSet; import java.util.Set; import org.apache.rocketmq.common.message.MessageQueue; @@ -59,4 +60,14 @@ public Set getMqSet() { public void setMqSet(Set mqSet) { this.mqSet = mqSet; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("consumerGroup", consumerGroup) + .add("clientId", clientId) + .add("onlyThisBroker", onlyThisBroker) + .add("mqSet", mqSet) + .toString(); + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java index 5965e9dcbb4..2ccf564df56 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.remoting.protocol.header; +import com.google.common.base.MoreObjects; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; @@ -99,4 +100,17 @@ public String getAttemptId() { public void setAttemptId(String attemptId) { this.attemptId = attemptId; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("consumerGroup", consumerGroup) + .add("topic", topic) + .add("queueId", queueId) + .add("pollTime", pollTime) + .add("bornTime", bornTime) + .add("order", order) + .add("attemptId", attemptId) + .toString(); + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java index 39aaa011762..e16d38a7a3e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java @@ -20,6 +20,7 @@ */ package org.apache.rocketmq.remoting.protocol.header; +import com.google.common.base.MoreObjects; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; @@ -73,4 +74,14 @@ public Boolean getSetZeroIfNotFound() { public void setSetZeroIfNotFound(Boolean setZeroIfNotFound) { this.setZeroIfNotFound = setZeroIfNotFound; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("consumerGroup", consumerGroup) + .add("topic", topic) + .add("queueId", queueId) + .add("setZeroIfNotFound", setZeroIfNotFound) + .toString(); + } }