Skip to content

Commit

Permalink
[ISSUE #6964] use the correct context in telemetry; polish the code s…
Browse files Browse the repository at this point in the history
…tructure (#6965)
  • Loading branch information
xdkxlk committed Jun 29, 2023
1 parent 87075c2 commit bbbe737
Show file tree
Hide file tree
Showing 22 changed files with 160 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.grpc.v2;

import org.apache.rocketmq.proxy.common.ProxyContext;

public interface ContextStreamObserver<V> {

void onNext(ProxyContext ctx, V value);

void onError(Throwable t);

void onCompleted();
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuratio
}

@Override
public StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx,
StreamObserver<TelemetryCommand> responseObserver) {
return this.clientActivity.telemetry(ctx, responseObserver);
public ContextStreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver) {
return this.clientActivity.telemetry(responseObserver);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,17 +378,17 @@ public void changeInvisibleDuration(ChangeInvisibleDurationRequest request,
@Override
public StreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver) {
Function<Status, TelemetryCommand> statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build();
ProxyContext context = createContext();
StreamObserver<TelemetryCommand> responseTelemetryCommand = grpcMessingActivity.telemetry(context, responseObserver);
ContextStreamObserver<TelemetryCommand> responseTelemetryCommand = grpcMessingActivity.telemetry(responseObserver);
return new StreamObserver<TelemetryCommand>() {
@Override
public void onNext(TelemetryCommand value) {
ProxyContext context = createContext();
try {
validateContext(context);
addExecutor(clientManagerThreadPoolExecutor,
context,
value,
() -> responseTelemetryCommand.onNext(value),
() -> responseTelemetryCommand.onNext(context, value),
responseObserver,
statusResponseCreator);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ CompletableFuture<NotifyClientTerminationResponse> notifyClientTermination(Proxy
CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(ProxyContext ctx,
ChangeInvisibleDurationRequest request);

StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx, StreamObserver<TelemetryCommand> responseObserver);
ContextStreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver);
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
import org.apache.rocketmq.proxy.grpc.v2.ContextStreamObserver;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
Expand Down Expand Up @@ -174,11 +175,10 @@ public CompletableFuture<NotifyClientTerminationResponse> notifyClientTerminatio
return future;
}

public StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx,
StreamObserver<TelemetryCommand> responseObserver) {
return new StreamObserver<TelemetryCommand>() {
public ContextStreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver) {
return new ContextStreamObserver<TelemetryCommand>() {
@Override
public void onNext(TelemetryCommand request) {
public void onNext(ProxyContext ctx, TelemetryCommand request) {
try {
switch (request.getCommandCase()) {
case SETTINGS: {
Expand Down Expand Up @@ -271,7 +271,7 @@ protected void processAndWriteClientSettings(ProxyContext ctx, TelemetryCommand

protected TelemetryCommand processClientSettings(ProxyContext ctx, TelemetryCommand request) {
String clientId = ctx.getClientID();
grpcClientSettingsManager.updateClientSettings(clientId, request.getSettings());
grpcClientSettingsManager.updateClientSettings(ctx, clientId, request.getSettings());
Settings settings = grpcClientSettingsManager.getClientSettings(ctx);
return TelemetryCommand.newBuilder()
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, Code.OK.name()))
Expand Down Expand Up @@ -458,7 +458,11 @@ protected void processClientRegister(String group, Object... args) {
if (settings == null) {
return;
}
grpcClientSettingsManager.updateClientSettings(clientChannelInfo.getClientId(), settings);
grpcClientSettingsManager.updateClientSettings(
ProxyContext.createForInner(this.getClass()),
clientChannelInfo.getClientId(),
settings
);
}
}
}
Expand All @@ -475,7 +479,7 @@ protected class ProducerChangeListenerImpl implements ProducerChangeListener {
public void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo) {
if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
grpcClientSettingsManager.removeAndGetRawClientSettings(clientChannelInfo.getClientId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.MetricCollectorMode;
import org.apache.rocketmq.proxy.config.ProxyConfig;
Expand All @@ -68,7 +67,7 @@ public Settings getRawClientSettings(String clientId) {

public Settings getClientSettings(ProxyContext ctx) {
String clientId = ctx.getClientID();
Settings settings = CLIENT_SETTINGS_MAP.get(clientId);
Settings settings = getRawClientSettings(clientId);
if (settings == null) {
return null;
}
Expand Down Expand Up @@ -182,7 +181,7 @@ protected static CustomizedBackoff convertToCustomizedRetryPolicy(CustomizedRetr
.build();
}

public void updateClientSettings(String clientId, Settings settings) {
public void updateClientSettings(ProxyContext ctx, String clientId, Settings settings) {
if (settings.hasSubscription()) {
settings = createDefaultConsumerSettingsBuilder().mergeFrom(settings).build();
}
Expand All @@ -194,17 +193,13 @@ protected Settings.Builder createDefaultConsumerSettingsBuilder() {
.toBuilder();
}

public void removeClientSettings(String clientId) {
CLIENT_SETTINGS_MAP.remove(clientId);
}

public void computeIfPresent(String clientId, Function<Settings, Settings> function) {
CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, value) -> function.apply(value));
public Settings removeAndGetRawClientSettings(String clientId) {
return CLIENT_SETTINGS_MAP.remove(clientId);
}

public Settings removeAndGetClientSettings(ProxyContext ctx) {
String clientId = ctx.getClientID();
Settings settings = CLIENT_SETTINGS_MAP.remove(clientId);
Settings settings = this.removeAndGetRawClientSettings(clientId);
if (settings == null) {
return null;
}
Expand Down Expand Up @@ -237,7 +232,10 @@ protected void onWaitEnd() {
return settings;
}
String consumerGroup = GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup());
ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo(consumerGroup);
ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo(
ProxyContext.createForInner(this.getClass()),
consumerGroup
);
if (consumerGroupInfo == null || consumerGroupInfo.findChannel(clientId) == null) {
log.info("remove unused grpc client settings. group:{}, settings:{}", consumerGroupInfo, settings);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void registerConsumerIdsChangeListener(ConsumerIdsChangeListener listener
this.serviceManager.getConsumerManager().appendConsumerIdsChangeListener(listener);
}

public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) {
public ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String consumerGroup) {
return this.serviceManager.getConsumerManager().getConsumerGroupInfo(consumerGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ public void doChannelCloseEvent(String remoteAddr, Channel channel) {
}

@Override
public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) {
return this.clientProcessor.getConsumerGroupInfo(consumerGroup);
public ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String consumerGroup) {
return this.clientProcessor.getConsumerGroupInfo(ctx, consumerGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ void registerConsumerListener(

void doChannelCloseEvent(String remoteAddr, Channel channel);

ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup);
ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String consumerGroup);

void addTransactionSubscription(
ProxyContext ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand r

for (ProducerData data : heartbeatData.getProducerDataSet()) {
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
this.remotingChannelManager.createProducerChannel(ctx.channel(), data.getGroupName(), clientId),
this.remotingChannelManager.createProducerChannel(context, ctx.channel(), data.getGroupName(), clientId),
clientId, request.getLanguage(),
request.getVersion());
setClientPropertiesToChannelAttr(clientChannelInfo);
Expand All @@ -89,7 +89,7 @@ protected RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand r

for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
this.remotingChannelManager.createConsumerChannel(ctx.channel(), data.getGroupName(), clientId, data.getSubscriptionDataSet()),
this.remotingChannelManager.createConsumerChannel(context, ctx.channel(), data.getGroupName(), clientId, data.getSubscriptionDataSet()),
clientId, request.getLanguage(),
request.getVersion());
setClientPropertiesToChannelAttr(clientChannelInfo);
Expand Down Expand Up @@ -122,7 +122,7 @@ protected RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCo
(UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
final String producerGroup = requestHeader.getProducerGroup();
if (producerGroup != null) {
RemotingChannel channel = this.remotingChannelManager.removeProducerChannel(producerGroup, ctx.channel());
RemotingChannel channel = this.remotingChannelManager.removeProducerChannel(context, producerGroup, ctx.channel());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
requestHeader.getClientID(),
Expand All @@ -132,7 +132,7 @@ protected RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCo
}
final String consumerGroup = requestHeader.getConsumerGroup();
if (consumerGroup != null) {
RemotingChannel channel = this.remotingChannelManager.removeConsumerChannel(consumerGroup, ctx.channel());
RemotingChannel channel = this.remotingChannelManager.removeConsumerChannel(context, consumerGroup, ctx.channel());
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
channel,
requestHeader.getClientID(),
Expand Down Expand Up @@ -170,7 +170,7 @@ public void handle(ConsumerGroupEvent event, String group, Object... args) {
}
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo = (ClientChannelInfo) args[0];
remotingChannelManager.removeConsumerChannel(group, clientChannelInfo.getChannel());
remotingChannelManager.removeConsumerChannel(ProxyContext.createForInner(this.getClass()), group, clientChannelInfo.getChannel());
log.info("remove remoting channel when client unregister. clientChannelInfo:{}", clientChannelInfo);
}
}
Expand All @@ -187,7 +187,7 @@ protected class ProducerChangeListenerImpl implements ProducerChangeListener {
@Override
public void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo) {
if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
remotingChannelManager.removeProducerChannel(group, clientChannelInfo.getChannel());
remotingChannelManager.removeProducerChannel(ProxyContext.createForInner(this.getClass()), group, clientChannelInfo.getChannel());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, Remo
ProxyContext context) throws Exception {
RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
GetConsumerListByGroupRequestHeader header = (GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup());
List<String> clientIds = consumerGroupInfo.getAllClientId();
GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
Expand All @@ -96,7 +96,7 @@ protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, R
ProxyContext context) throws Exception {
RemotingCommand response = RemotingCommand.createResponseCommand(GetConsumerConnectionListRequestHeader.class);
GetConsumerConnectionListRequestHeader header = (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
ConsumerGroupInfo consumerGroupInfo = messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup());
if (consumerGroupInfo != null) {
ConsumerConnection bodydata = new ConsumerConnection();
bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom
PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
int sysFlag = requestHeader.getSysFlag();
if (!PullSysFlag.hasSubscriptionFlag(sysFlag)) {
ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(context, requestHeader.getConsumerGroup());
if (consumerInfo == null) {
return RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST,
"the consumer's subscription not latest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
Expand Down Expand Up @@ -57,11 +58,11 @@ protected String buildKey(String prefix, String group) {
return prefix + group;
}

public RemotingChannel createProducerChannel(Channel channel, String group, String clientId) {
public RemotingChannel createProducerChannel(ProxyContext ctx, Channel channel, String group, String clientId) {
return createChannel(channel, buildProducerKey(group), clientId, Collections.emptySet());
}

public RemotingChannel createConsumerChannel(Channel channel, String group, String clientId, Set<SubscriptionData> subscriptionData) {
public RemotingChannel createConsumerChannel(ProxyContext ctx, Channel channel, String group, String clientId, Set<SubscriptionData> subscriptionData) {
return createChannel(channel, buildConsumerKey(group), clientId, subscriptionData);
}

Expand Down Expand Up @@ -96,11 +97,11 @@ public Set<RemotingChannel> removeChannel(Channel channel) {
return removedChannelSet;
}

public RemotingChannel removeProducerChannel(String group, Channel channel) {
public RemotingChannel removeProducerChannel(ProxyContext ctx, String group, Channel channel) {
return removeChannel(buildProducerKey(group), channel);
}

public RemotingChannel removeConsumerChannel(String group, Channel channel) {
public RemotingChannel removeConsumerChannel(ProxyContext ctx, String group, Channel channel) {
return removeChannel(buildConsumerKey(group), channel);
}

Expand Down
Loading

0 comments on commit bbbe737

Please sign in to comment.