diff --git a/common/pom.xml b/common/pom.xml index cea58306ae1..ad2046a28fb 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -44,5 +44,10 @@ com.google.guava guava + + org.slf4j + slf4j-api + 1.7.7 + diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java index c29eccd4cbe..2b4d9b14ef4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java @@ -39,4 +39,5 @@ public class LoggerName { public static final String FILTER_LOGGER_NAME = "RocketmqFilter"; public static final String ROCKETMQ_POP_LOGGER_NAME = "RocketmqPop"; public static final String STDOUT_LOGGER_NAME = "STDOUT"; + public static final String GRPC_LOGGER_NAME = "RocketmqGrpc"; } diff --git a/common/src/main/java/org/apache/rocketmq/common/logger/ProxyLogger.java b/common/src/main/java/org/apache/rocketmq/common/logger/ProxyLogger.java new file mode 100644 index 00000000000..1e51ed9d8d1 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/logger/ProxyLogger.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.logger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProxyLogger { + public static final Logger LOG_WATER_MARK = LoggerFactory.getLogger("Watermark"); + public static final Logger LOG_JSTACK = LoggerFactory.getLogger("Jstack"); +} diff --git a/common/src/main/java/org/apache/rocketmq/common/logger/WatermarkLogger.java b/common/src/main/java/org/apache/rocketmq/common/logger/WatermarkLogger.java new file mode 100644 index 00000000000..9e57d02c8ff --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/logger/WatermarkLogger.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.logger; + +import org.slf4j.Logger; + +public class WatermarkLogger { + private static final Logger LOG_MSG_TRACE = ProxyLogger.LOG_WATER_MARK; + + public static void info(String name, String k, double v) { + LOG_MSG_TRACE.info("\t{}\t{}\t{}", name, k, v); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java new file mode 100644 index 00000000000..6db0b2f15a4 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.thread; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.logger.ProxyLogger; +import org.apache.rocketmq.common.logger.WatermarkLogger; + +public class ThreadPoolMonitor { + private static final List MONITOR_EXECUTOR = new CopyOnWriteArrayList<>(); + private static final ScheduledExecutorService MONITOR_SCHEDULED = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build() + ); + + private static volatile boolean enablePrintJstack = true; + private static volatile long jstackPeriodTIme = 60000; + private static volatile long jstackTime = System.currentTimeMillis(); + + public static void config(boolean enablePrintJstack, long jstackPeriodTime) { + ThreadPoolMonitor.enablePrintJstack = enablePrintJstack; + jstackPeriodTIme = jstackPeriodTime; + } + + public static ThreadPoolExecutor createAndMonitor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + String name, + int queueCapacity) { + return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, name, queueCapacity, Collections.emptyList()); + } + + public static ThreadPoolExecutor createAndMonitor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + String name, + int queueCapacity, + ThreadPoolStatusMonitor... threadPoolStatusMonitors) { + return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, name, queueCapacity, + Lists.newArrayList(threadPoolStatusMonitors)); + } + + public static ThreadPoolExecutor createAndMonitor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + String name, + int queueCapacity, + List threadPoolStatusMonitors) { + ThreadPoolExecutor executor = new ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + new LinkedBlockingQueue<>(queueCapacity), + new ThreadFactoryBuilder().setNameFormat(name + "-%d").build(), + new ThreadPoolExecutor.DiscardOldestPolicy()); + List printers = Lists.newArrayList(new ThreadPoolQueueSizeMonitor(queueCapacity)); + printers.addAll(threadPoolStatusMonitors); + + MONITOR_EXECUTOR.add(ThreadPoolWrapper.builder() + .name(name) + .threadPoolExecutor(executor) + .statusPrinters(printers) + .build()); + return executor; + } + + public static void logThreadPoolStatus() { + for (ThreadPoolWrapper threadPoolWrapper : MONITOR_EXECUTOR) { + List monitors = threadPoolWrapper.getStatusPrinters(); + for (ThreadPoolStatusMonitor monitor : monitors) { + double value = monitor.value(threadPoolWrapper.getThreadPoolExecutor()); + WatermarkLogger.info(threadPoolWrapper.getName(), + monitor.describe(), + value); + + if (enablePrintJstack) { + if (monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) && + System.currentTimeMillis() - jstackTime > jstackPeriodTIme) { + jstackTime = System.currentTimeMillis(); + ProxyLogger.LOG_JSTACK.warn("jstack start \n " + UtilAll.jstack()); + } + } + } + } + } + + public static void init() { + MONITOR_SCHEDULED.scheduleAtFixedRate(ThreadPoolMonitor::logThreadPoolStatus, 20, 1, TimeUnit.SECONDS); + } + + public static void shutdown() { + MONITOR_SCHEDULED.shutdown(); + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolQueueSizeMonitor.java b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolQueueSizeMonitor.java new file mode 100644 index 00000000000..9e2e2f675ca --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolQueueSizeMonitor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.thread; + +import java.util.concurrent.ThreadPoolExecutor; + +public class ThreadPoolQueueSizeMonitor implements ThreadPoolStatusMonitor { + + private final int maxQueueCapacity; + + public ThreadPoolQueueSizeMonitor(int maxQueueCapacity) { + this.maxQueueCapacity = maxQueueCapacity; + } + + @Override + public String describe() { + return "queueSize"; + } + + @Override + public double value(ThreadPoolExecutor executor) { + return executor.getQueue().size(); + } + + @Override + public boolean needPrintJstack(ThreadPoolExecutor executor, double value) { + return value > maxQueueCapacity * 0.85; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolStatusMonitor.java b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolStatusMonitor.java new file mode 100644 index 00000000000..548fec52ec0 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolStatusMonitor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.thread; + +import java.util.concurrent.ThreadPoolExecutor; + +public interface ThreadPoolStatusMonitor { + + String describe(); + + double value(ThreadPoolExecutor executor); + + boolean needPrintJstack(ThreadPoolExecutor executor, double value); +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolWrapper.java b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolWrapper.java new file mode 100644 index 00000000000..653fa1d8339 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolWrapper.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.thread; + +import com.google.common.base.Objects; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +public class ThreadPoolWrapper { + private String name; + private ThreadPoolExecutor threadPoolExecutor; + private List statusPrinters; + + ThreadPoolWrapper(final String name, final ThreadPoolExecutor threadPoolExecutor, + final List statusPrinters) { + this.name = name; + this.threadPoolExecutor = threadPoolExecutor; + this.statusPrinters = statusPrinters; + } + + public static class ThreadPoolWrapperBuilder { + private String name; + private ThreadPoolExecutor threadPoolExecutor; + private List statusPrinters; + + ThreadPoolWrapperBuilder() { + } + + public ThreadPoolWrapper.ThreadPoolWrapperBuilder name(final String name) { + this.name = name; + return this; + } + + public ThreadPoolWrapper.ThreadPoolWrapperBuilder threadPoolExecutor( + final ThreadPoolExecutor threadPoolExecutor) { + this.threadPoolExecutor = threadPoolExecutor; + return this; + } + + public ThreadPoolWrapper.ThreadPoolWrapperBuilder statusPrinters( + final List statusPrinters) { + this.statusPrinters = statusPrinters; + return this; + } + + public ThreadPoolWrapper build() { + return new ThreadPoolWrapper(this.name, this.threadPoolExecutor, this.statusPrinters); + } + + @java.lang.Override + public java.lang.String toString() { + return "ThreadPoolWrapper.ThreadPoolWrapperBuilder(name=" + this.name + ", threadPoolExecutor=" + this.threadPoolExecutor + ", statusPrinters=" + this.statusPrinters + ")"; + } + } + + public static ThreadPoolWrapper.ThreadPoolWrapperBuilder builder() { + return new ThreadPoolWrapper.ThreadPoolWrapperBuilder(); + } + + public String getName() { + return this.name; + } + + public ThreadPoolExecutor getThreadPoolExecutor() { + return this.threadPoolExecutor; + } + + public List getStatusPrinters() { + return this.statusPrinters; + } + + public void setName(final String name) { + this.name = name; + } + + public void setThreadPoolExecutor(final ThreadPoolExecutor threadPoolExecutor) { + this.threadPoolExecutor = threadPoolExecutor; + } + + public void setStatusPrinters(final List statusPrinters) { + this.statusPrinters = statusPrinters; + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + ThreadPoolWrapper wrapper = (ThreadPoolWrapper) o; + return Objects.equal(name, wrapper.name) && Objects.equal(threadPoolExecutor, wrapper.threadPoolExecutor) && Objects.equal(statusPrinters, wrapper.statusPrinters); + } + + @Override public int hashCode() { + return Objects.hashCode(name, threadPoolExecutor, statusPrinters); + } + + @Override public String toString() { + final StringBuilder sb = new StringBuilder("ThreadPoolWrapper{"); + sb.append("name='") + .append(name) + .append('\''); + sb.append(", threadPoolExecutor=") + .append(threadPoolExecutor); + sb.append(", statusPrinters=") + .append(statusPrinters); + sb.append('}'); + return sb.toString(); + } +} diff --git a/grpc/pom.xml b/grpc/pom.xml deleted file mode 100644 index 6c578762057..00000000000 --- a/grpc/pom.xml +++ /dev/null @@ -1,39 +0,0 @@ - - - - - org.apache.rocketmq - rocketmq-all - 5.0.0-BETA-SNAPSHOT - - - 4.0.0 - jar - rocketmq-grpc - rocketmq-grpc ${project.version} - - - - org.apache.rocketmq - rocketmq-proto - 5.0.0-SNAPSHOT - - - - \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6dfd57c4969..56bcf9033c0 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ ${project.basedir}/../test/target/jacoco-it.exec file:**/generated-sources/**,**/test/** + 1.38.0 @@ -122,6 +123,7 @@ acl example grpc + proxy @@ -519,6 +521,11 @@ rocketmq-example ${project.version} + + ${project.groupId} + rocketmq-grpc + ${project.version} + org.slf4j slf4j-api @@ -611,7 +618,37 @@ commons-validator 1.7 - + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-services + ${grpc.version} + + + io.grpc + grpc-testing + ${grpc.version} + test + + + com.google.protobuf + protobuf-java-util + 3.17.2 + diff --git a/proxy/pom.xml b/proxy/pom.xml new file mode 100644 index 00000000000..37c383076d6 --- /dev/null +++ b/proxy/pom.xml @@ -0,0 +1,82 @@ + + + + + + rocketmq-all + org.apache.rocketmq + 5.0.0-BETA-SNAPSHOT + + + 4.0.0 + jar + rocketmq-proxy + rocketmq-proxy ${project.version} + + + 8 + 8 + + + + + org.apache.rocketmq + rocketmq-proto + 5.0.0-SNAPSHOT + + + io.grpc + grpc-netty-shaded + + + io.grpc + grpc-protobuf + + + io.grpc + grpc-stub + + + io.grpc + grpc-services + + + com.google.protobuf + protobuf-java-util + + + org.apache.rocketmq + rocketmq-common + + + org.apache.commons + commons-lang3 + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + \ No newline at end of file diff --git a/proxy/src/main/java/org/apache/rocketmq/configuration/Configuration.java b/proxy/src/main/java/org/apache/rocketmq/configuration/Configuration.java new file mode 100644 index 00000000000..9b33c61d38f --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/configuration/Configuration.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.configuration; + +import com.alibaba.fastjson.JSON; +import java.io.File; +import java.nio.file.Files; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Configuration { + private final static Logger log = LoggerFactory.getLogger(Configuration.class); + private final AtomicReference proxyConfigReference = new AtomicReference<>(); + + public void init() throws Exception { + String proxyConfigData = loadJsonConfig(ProxyConfig.CONFIG_FILE_NAME); + ProxyConfig proxyConfig = JSON.parseObject(proxyConfigData, ProxyConfig.class); + setProxyConfig(proxyConfig); + } + + public static String loadJsonConfig(String configFileName) throws Exception { + String filePath = new File(ConfigurationManager.getProxyHome() + File.separator + "conf", configFileName).toString(); + + File file = new File(filePath); + if (!file.exists()) { + log.warn("the config file {} not exist", filePath); + return null; + } + long fileLength = file.length(); + if (fileLength <= 0) { + log.warn("the config file {} length is zero", filePath); + return null; + } + + return new String(Files.readAllBytes(file.toPath())); + } + + public ProxyConfig getProxyConfig() { + return proxyConfigReference.get(); + } + + public void setProxyConfig(ProxyConfig proxyConfig) { + proxyConfigReference.set(proxyConfig); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/configuration/ConfigurationManager.java b/proxy/src/main/java/org/apache/rocketmq/configuration/ConfigurationManager.java new file mode 100644 index 00000000000..3c732a34042 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/configuration/ConfigurationManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.configuration; + +import java.io.File; +import org.apache.commons.lang3.StringUtils; + +public class ConfigurationManager { + private static final String RMQ_PROXY_HOME = "RMQ_PROXY_HOME"; + private static final String DEFAULT_RMQ_PROXY_HOME = System.getProperty("user.home") + File.separator + "rmq-proxy"; + private static String proxyHome; + private static Configuration configuration; + + public static void initEnv() { + proxyHome = System.getenv(RMQ_PROXY_HOME); + if (StringUtils.isEmpty(proxyHome)) { + proxyHome = System.getProperty(RMQ_PROXY_HOME, DEFAULT_RMQ_PROXY_HOME); + } + } + + public static void intConfig() throws Exception { + configuration = new Configuration(); + configuration.init(); + } + + public static String getProxyHome() { + return proxyHome; + } + + public static ProxyConfig getProxyConfig() { + return configuration.getProxyConfig(); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/configuration/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/configuration/ProxyConfig.java new file mode 100644 index 00000000000..2c2d555e2ce --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/configuration/ProxyConfig.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.configuration; + +import org.apache.rocketmq.grpc.ProxyMode; + +public class ProxyConfig { + public final static String CONFIG_FILE_NAME = "rmq-proxy.json"; + + /** + * gRPC + */ + private String proxyMode = ProxyMode.CLUSTER.name(); + private Boolean startGrpcServer = true; + private Integer grpcServerPort = 8081; + private String grpcTlsKeyPath = "/home/admin/rmq-gateway/conf/tls/gRPC.key.pem"; + private String grpcTlsCertPath = "/home/admin/rmq-gateway/conf/tls/gRPC.chain.cert.pem"; + private int grpcBossLoopNum = 1; + private int grpcWorkerLoopNum = Runtime.getRuntime().availableProcessors() * 2; + private int grpcThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int grpcThreadPoolQueueCapacity = 100000; + /** + * gRPC max message size + * 130M = 4M * 32 messages + 2M attributes + */ + private int grpcMaxInboundMessageSize = 130 * 1024 * 1024; + + public String getProxyMode() { + return proxyMode; + } + + public void setProxyMode(String proxyMode) { + this.proxyMode = proxyMode; + } + + public Boolean getStartGrpcServer() { + return startGrpcServer; + } + + public void setStartGrpcServer(Boolean startGrpcServer) { + this.startGrpcServer = startGrpcServer; + } + + public Integer getGrpcServerPort() { + return grpcServerPort; + } + + public void setGrpcServerPort(Integer grpcServerPort) { + this.grpcServerPort = grpcServerPort; + } + + public String getGrpcTlsKeyPath() { + return grpcTlsKeyPath; + } + + public void setGrpcTlsKeyPath(String grpcTlsKeyPath) { + this.grpcTlsKeyPath = grpcTlsKeyPath; + } + + public String getGrpcTlsCertPath() { + return grpcTlsCertPath; + } + + public void setGrpcTlsCertPath(String grpcTlsCertPath) { + this.grpcTlsCertPath = grpcTlsCertPath; + } + + public int getGrpcBossLoopNum() { + return grpcBossLoopNum; + } + + public void setGrpcBossLoopNum(int grpcBossLoopNum) { + this.grpcBossLoopNum = grpcBossLoopNum; + } + + public int getGrpcWorkerLoopNum() { + return grpcWorkerLoopNum; + } + + public void setGrpcWorkerLoopNum(int grpcWorkerLoopNum) { + this.grpcWorkerLoopNum = grpcWorkerLoopNum; + } + + public int getGrpcThreadPoolNums() { + return grpcThreadPoolNums; + } + + public void setGrpcThreadPoolNums(int grpcThreadPoolNums) { + this.grpcThreadPoolNums = grpcThreadPoolNums; + } + + public int getGrpcThreadPoolQueueCapacity() { + return grpcThreadPoolQueueCapacity; + } + + public void setGrpcThreadPoolQueueCapacity(int grpcThreadPoolQueueCapacity) { + this.grpcThreadPoolQueueCapacity = grpcThreadPoolQueueCapacity; + } + + public int getGrpcMaxInboundMessageSize() { + return grpcMaxInboundMessageSize; + } + + public void setGrpcMaxInboundMessageSize(int grpcMaxInboundMessageSize) { + this.grpcMaxInboundMessageSize = grpcMaxInboundMessageSize; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/grpc/ClusterGrpcService.java b/proxy/src/main/java/org/apache/rocketmq/grpc/ClusterGrpcService.java new file mode 100644 index 00000000000..ce53d4db551 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/grpc/ClusterGrpcService.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.grpc; + +public class ClusterGrpcService implements GrpcService { +} diff --git a/proxy/src/main/java/org/apache/rocketmq/grpc/GrpcMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/grpc/GrpcMessagingProcessor.java new file mode 100644 index 00000000000..42e959d57fd --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/grpc/GrpcMessagingProcessor.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.grpc; + +import apache.rocketmq.v1.MessagingServiceGrpc; + +public class GrpcMessagingProcessor extends MessagingServiceGrpc.MessagingServiceImplBase { + private final GrpcService grpcService; + + public GrpcMessagingProcessor(GrpcService grpcService) { + this.grpcService = grpcService; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/grpc/GrpcServer.java b/proxy/src/main/java/org/apache/rocketmq/grpc/GrpcServer.java new file mode 100644 index 00000000000..682ea86dca0 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/grpc/GrpcServer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.grpc; + +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; +import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.thread.ThreadPoolMonitor; +import org.apache.rocketmq.configuration.ConfigurationManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GrpcServer { + + private static final Logger log = LoggerFactory.getLogger(GrpcServer.class); + + private final io.grpc.Server server; + + private final ThreadPoolExecutor executor; + + public GrpcServer(GrpcService grpcService) { + int port = ConfigurationManager.getProxyConfig().getGrpcServerPort(); + NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port); + + // add tls files + String tlsKeyPath = ConfigurationManager.getProxyConfig().getGrpcTlsKeyPath(); + String tlsCertPath = ConfigurationManager.getProxyConfig().getGrpcTlsCertPath(); + try { + InputStream serverKeyInputStream = new FileInputStream(tlsKeyPath); + InputStream serverCertificateStream = new FileInputStream(tlsCertPath); + + SslContext sslContext = GrpcSslContexts.forServer(serverCertificateStream, serverKeyInputStream) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .clientAuth(ClientAuth.NONE) + .build(); + serverBuilder.sslContext(sslContext); + } catch (IOException e) { + log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e); + throw new RuntimeException("grpc tls set failed: " + e.getMessage()); + } + + // create executor + int threadPoolNums = ConfigurationManager.getProxyConfig().getGrpcThreadPoolNums(); + int threadPoolQueueCapacity = ConfigurationManager.getProxyConfig().getGrpcThreadPoolQueueCapacity(); + this.executor = ThreadPoolMonitor.createAndMonitor( + threadPoolNums, + threadPoolNums, + 1, TimeUnit.MINUTES, + "GrpcRequestExecutorThread", + threadPoolQueueCapacity + ); + + GrpcMessagingProcessor messagingProcessor = new GrpcMessagingProcessor(grpcService); + + // build server + int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum(); + int workerLoopNum = ConfigurationManager.getProxyConfig().getGrpcWorkerLoopNum(); + int maxInboundMessageSize = ConfigurationManager.getProxyConfig().getGrpcMaxInboundMessageSize(); + + this.server = serverBuilder + .maxInboundMessageSize(maxInboundMessageSize) + .bossEventLoopGroup(new NioEventLoopGroup(bossLoopNum)) + .workerEventLoopGroup(new NioEventLoopGroup(workerLoopNum)) + .channelType(NioServerSocketChannel.class) + .addService(messagingProcessor) + .executor(this.executor) + .build(); + + log.info( + "grpc server has built. port: {}, tlsKeyPath: {}, tlsCertPath: {}, threadPool: {}, queueCapacity: {}, " + + "boosLoop: {}, workerLoop: {}, maxInboundMessageSize: {}", + port, tlsKeyPath, tlsCertPath, threadPoolNums, threadPoolQueueCapacity, + bossLoopNum, workerLoopNum, maxInboundMessageSize); + } + + public void start() throws Exception { + this.server.start(); + log.info("grpc server has started"); + } + + public void shutdown() { + try { + this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + this.executor.shutdown(); + + log.info("grpc server has stopped"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/proxy/src/main/java/org/apache/rocketmq/grpc/GrpcService.java b/proxy/src/main/java/org/apache/rocketmq/grpc/GrpcService.java new file mode 100644 index 00000000000..4d60f080e15 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/grpc/GrpcService.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.grpc; + +public interface GrpcService { +} diff --git a/proxy/src/main/java/org/apache/rocketmq/grpc/LocalGrpcService.java b/proxy/src/main/java/org/apache/rocketmq/grpc/LocalGrpcService.java new file mode 100644 index 00000000000..d1945d97c79 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/grpc/LocalGrpcService.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.grpc; + +public class LocalGrpcService implements GrpcService { +} diff --git a/proxy/src/main/java/org/apache/rocketmq/grpc/ProxyMode.java b/proxy/src/main/java/org/apache/rocketmq/grpc/ProxyMode.java new file mode 100644 index 00000000000..0b7eaf47cd6 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/grpc/ProxyMode.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.grpc; + +public enum ProxyMode { + LOCAL("LOCAL"), + CLUSTER("CLUSTER"); + + private final String mode; + + ProxyMode(String mode) { + this.mode = mode; + } + + public static boolean isClusterMode(String mode) { + if (mode == null) { + return false; + } + return CLUSTER.mode.equals(mode.toUpperCase()); + } + + public static boolean isLocalMode(String mode) { + if (mode == null) { + return false; + } + return LOCAL.mode.equals(mode.toUpperCase()); + } +}