Skip to content

Commit

Permalink
[ISSUE #5069] polish the startup of proxy; can specify parameters on …
Browse files Browse the repository at this point in the history
…the command line of proxy (#5083)

undefined
  • Loading branch information
xdkxlk committed Sep 16, 2022
1 parent 8ba84c3 commit 6f75e3d
Show file tree
Hide file tree
Showing 14 changed files with 438 additions and 44 deletions.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ header:
- '*/src/test/resources/META-INF/service/*'
- '*/src/main/resources/META-INF/service/*'
- '*/src/test/resources/rmq-proxy-home/conf/rmq-proxy.json'
- '*/src/test/resources/mockito-extensions/*'
- '**/target/**'
- '**/*.iml'
- 'docs/**'
Expand Down
35 changes: 34 additions & 1 deletion distribution/bin/mqbroker
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,37 @@ fi

export ROCKETMQ_HOME

sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@
other_args=" "
enable_proxy=false

while [[ $# -gt 0 ]]; do
case $1 in
--enable-proxy)
enable_proxy=true
shift
;;
-c|--configFile)
broker_config="$2"
shift
shift
;;
*)
other_args=${other_args}" "${1}
shift
;;
esac
done

if [ "$enable_proxy" = true ]; then
args_for_proxy=$other_args" -pm local"
if [ "$broker_config" != "" ]; then
args_for_proxy=${args_for_proxy}" -bc "${broker_config}
fi
sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy}
else
args_for_broker=$other_args
if [ "$broker_config" != "" ]; then
args_for_broker=${args_for_broker}" -c "${broker_config}
fi
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup ${args_for_broker}
fi
7 changes: 6 additions & 1 deletion distribution/bin/mqshutdown
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

case $1 in
broker)

pid=`ps ax | grep -i 'org.apache.rocketmq.proxy.ProxyStartup' | grep '\-pm local' |grep java | grep -v grep | awk '{print $1}'`
if [ "$pid" != "" ] ; then
echo "The mqbroker with proxy enable is running(${pid})..."
kill ${pid}
echo "Send shutdown request to mqbroker with proxy enable OK(${pid})"
fi
pid=`ps ax | grep -i 'org.apache.rocketmq.broker.BrokerStartup' |grep java | grep -v grep | awk '{print $1}'`
if [ -z "$pid" ] ; then
echo "No mqbroker running."
Expand Down
2 changes: 1 addition & 1 deletion distribution/conf/rmq-proxy.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{

"rocketMQClusterName": "DefaultCluster"
}
2 changes: 2 additions & 0 deletions proxy/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ java_library(
"//common",
"//client",
"//broker",
"//srvutil",
"//acl",
"@maven//:org_apache_rocketmq_rocketmq_proto",
"@maven//:org_apache_commons_commons_lang3",
Expand All @@ -50,6 +51,7 @@ java_library(
"@maven//:ch_qos_logback_logback_classic",
"@maven//:com_google_code_findbugs_jsr305",
"@maven//:org_checkerframework_checker_qual",
"@maven//:commons_cli_commons_cli",
],
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.proxy;

public class CommandLineArgument {
private String namesrvAddr;
private String brokerConfigPath;
private String proxyConfigPath;
private String proxyMode;

public String getNamesrvAddr() {
return namesrvAddr;
}

public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}

public String getBrokerConfigPath() {
return brokerConfigPath;
}

public void setBrokerConfigPath(String brokerConfigPath) {
this.brokerConfigPath = brokerConfigPath;
}

public String getProxyConfigPath() {
return proxyConfigPath;
}

public void setProxyConfigPath(String proxyConfigPath) {
this.proxyConfigPath = proxyConfigPath;
}

public String getProxyMode() {
return proxyMode;
}

public void setProxyMode(String proxyMode) {
this.proxyMode = proxyMode;
}
}
85 changes: 79 additions & 6 deletions proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import com.google.common.collect.Lists;
import io.grpc.protobuf.services.ChannelzService;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
Expand All @@ -36,13 +42,16 @@
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
import org.apache.rocketmq.proxy.config.Configuration;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.GrpcServer;
import org.apache.rocketmq.proxy.grpc.GrpcServerBuilder;
import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.LoggerFactory;

public class ProxyStartup {
Expand All @@ -58,9 +67,9 @@ public void appendStartAndShutdown(StartAndShutdown startAndShutdown) {

public static void main(String[] args) {
try {
ConfigurationManager.initEnv();
initLogger();
ConfigurationManager.intConfig();
// parse argument from command line
CommandLineArgument commandLineArgument = parseCommandLineArgument(args);
initLogAndConfiguration(commandLineArgument);

// init thread pool monitor for proxy.
initThreadPoolMonitor();
Expand Down Expand Up @@ -100,7 +109,59 @@ public static void main(String[] args) {
log.info(new Date() + " rocketmq-proxy startup successfully");
}

private static MessagingProcessor createMessagingProcessor() {
protected static void initLogAndConfiguration(CommandLineArgument commandLineArgument) throws Exception {
if (StringUtils.isNotBlank(commandLineArgument.getProxyConfigPath())) {
System.setProperty(Configuration.CONFIG_PATH_PROPERTY, commandLineArgument.getProxyConfigPath());
}
ConfigurationManager.initEnv();
initLogger();
ConfigurationManager.intConfig();
setConfigFromCommandLineArgument(commandLineArgument);
}

protected static CommandLineArgument parseCommandLineArgument(String[] args) {
CommandLine commandLine = ServerUtil.parseCmdLine("mqproxy", args,
buildCommandlineOptions(), new DefaultParser());
if (commandLine == null) {
throw new RuntimeException("parse command line argument failed");
}

CommandLineArgument commandLineArgument = new CommandLineArgument();
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), commandLineArgument);
return commandLineArgument;
}

private static Options buildCommandlineOptions() {
Options options = ServerUtil.buildCommandlineOptions(new Options());

Option opt = new Option("bc", "brokerConfigPath", true, "Broker config file path for local mode");
opt.setRequired(false);
options.addOption(opt);

opt = new Option("pc", "proxyConfigPath", true, "Proxy config file path");
opt.setRequired(false);
options.addOption(opt);

opt = new Option("pm", "proxyMode", true, "Proxy run in local or cluster mode");
opt.setRequired(false);
options.addOption(opt);

return options;
}

private static void setConfigFromCommandLineArgument(CommandLineArgument commandLineArgument) {
if (StringUtils.isNotBlank(commandLineArgument.getNamesrvAddr())) {
ConfigurationManager.getProxyConfig().setNamesrvAddr(commandLineArgument.getNamesrvAddr());
}
if (StringUtils.isNotBlank(commandLineArgument.getBrokerConfigPath())) {
ConfigurationManager.getProxyConfig().setBrokerConfigPath(commandLineArgument.getBrokerConfigPath());
}
if (StringUtils.isNotBlank(commandLineArgument.getProxyMode())) {
ConfigurationManager.getProxyConfig().setProxyMode(commandLineArgument.getProxyMode());
}
}

protected static MessagingProcessor createMessagingProcessor() {
String proxyModeStr = ConfigurationManager.getProxyConfig().getProxyMode();
MessagingProcessor messagingProcessor;

Expand All @@ -112,6 +173,12 @@ private static MessagingProcessor createMessagingProcessor() {
@Override
public void start() throws Exception {
brokerController.start();
String tip = "The broker[" + brokerController.getBrokerConfig().getBrokerName() + ", "
+ brokerController.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != brokerController.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + brokerController.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
}

@Override
Expand All @@ -134,8 +201,14 @@ private static GrpcMessagingApplication createServiceProcessor(MessagingProcesso
return application;
}

private static BrokerController createBrokerController() {
String[] brokerStartupArgs = new String[] {"-c", ConfigurationManager.getProxyConfig().getBrokerConfigPath()};
protected static BrokerController createBrokerController() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
List<String> brokerStartupArgList = Lists.newArrayList("-c", config.getBrokerConfigPath());
if (StringUtils.isNotBlank(config.getNamesrvAddr())) {
brokerStartupArgList.add("-n");
brokerStartupArgList.add(config.getNamesrvAddr());
}
String[] brokerStartupArgs = brokerStartupArgList.toArray(new String[0]);
return BrokerStartup.createBrokerController(brokerStartupArgs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,44 +26,46 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Configuration {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private final AtomicReference<ProxyConfig> proxyConfigReference = new AtomicReference<>();
public static final String CONFIG_PATH_PROPERTY = "com.rocketmq.proxy.configPath";

public void init() throws Exception {
String proxyConfigData = loadJsonConfig(ProxyConfig.CONFIG_FILE_NAME);
if (null == proxyConfigData) {
throw new RuntimeException(String.format("load configuration from file: %s error.", ProxyConfig.CONFIG_FILE_NAME));
}
String proxyConfigData = loadJsonConfig();

ProxyConfig proxyConfig = JSON.parseObject(proxyConfigData, ProxyConfig.class);
proxyConfig.initData();
setProxyConfig(proxyConfig);
}

public static String loadJsonConfig(String configFileName) throws Exception {
final String testResource = "rmq-proxy-home/conf/" + configFileName;
try (InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(testResource)) {
if (null != inputStream) {
return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8));
public static String loadJsonConfig() throws Exception {
String configFileName = ProxyConfig.DEFAULT_CONFIG_FILE_NAME;
String filePath = System.getProperty(CONFIG_PATH_PROPERTY);
if (StringUtils.isBlank(filePath)) {
final String testResource = "rmq-proxy-home/conf/" + configFileName;
try (InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(testResource)) {
if (null != inputStream) {
return CharStreams.toString(new InputStreamReader(inputStream, Charsets.UTF_8));
}
}
filePath = new File(ConfigurationManager.getProxyHome() + File.separator + "conf", configFileName).toString();
}

String filePath = new File(ConfigurationManager.getProxyHome() + File.separator + "conf", configFileName).toString();

File file = new File(filePath);
if (!file.exists()) {
log.warn("the config file {} not exist", filePath);
return null;
throw new RuntimeException(String.format("the config file %s not exist", filePath));
}
long fileLength = file.length();
if (fileLength <= 0) {
log.warn("the config file {} length is zero", filePath);
return null;
throw new RuntimeException(String.format("the config file %s length is zero", filePath));
}

return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
Expand Down
Loading

0 comments on commit 6f75e3d

Please sign in to comment.