forked from apache/rocketmq
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[ISSUE apache#3949] Initialize basic structure
- Loading branch information
Showing
20 changed files
with
979 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
26 changes: 26 additions & 0 deletions
26
common/src/main/java/org/apache/rocketmq/common/logger/ProxyLogger.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.rocketmq.common.logger; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ProxyLogger { | ||
public static final Logger LOG_WATER_MARK = LoggerFactory.getLogger("Watermark"); | ||
public static final Logger LOG_JSTACK = LoggerFactory.getLogger("Jstack"); | ||
} |
28 changes: 28 additions & 0 deletions
28
common/src/main/java/org/apache/rocketmq/common/logger/WatermarkLogger.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.rocketmq.common.logger; | ||
|
||
import org.slf4j.Logger; | ||
|
||
public class WatermarkLogger { | ||
private static final Logger LOG_MSG_TRACE = ProxyLogger.LOG_WATER_MARK; | ||
|
||
public static void info(String name, String k, double v) { | ||
LOG_MSG_TRACE.info("\t{}\t{}\t{}", name, k, v); | ||
} | ||
} |
122 changes: 122 additions & 0 deletions
122
common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.rocketmq.common.thread; | ||
|
||
import com.google.common.collect.Lists; | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.rocketmq.common.UtilAll; | ||
import org.apache.rocketmq.common.logger.ProxyLogger; | ||
import org.apache.rocketmq.common.logger.WatermarkLogger; | ||
|
||
public class ThreadPoolMonitor { | ||
private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new CopyOnWriteArrayList<>(); | ||
private static final ScheduledExecutorService MONITOR_SCHEDULED = Executors.newSingleThreadScheduledExecutor( | ||
new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build() | ||
); | ||
|
||
private static volatile boolean enablePrintJstack = true; | ||
private static volatile long jstackPeriodTIme = 60000; | ||
private static volatile long jstackTime = System.currentTimeMillis(); | ||
|
||
public static void config(boolean enablePrintJstack, long jstackPeriodTime) { | ||
ThreadPoolMonitor.enablePrintJstack = enablePrintJstack; | ||
jstackPeriodTIme = jstackPeriodTime; | ||
} | ||
|
||
public static ThreadPoolExecutor createAndMonitor(int corePoolSize, | ||
int maximumPoolSize, | ||
long keepAliveTime, | ||
TimeUnit unit, | ||
String name, | ||
int queueCapacity) { | ||
return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, name, queueCapacity, Collections.emptyList()); | ||
} | ||
|
||
public static ThreadPoolExecutor createAndMonitor(int corePoolSize, | ||
int maximumPoolSize, | ||
long keepAliveTime, | ||
TimeUnit unit, | ||
String name, | ||
int queueCapacity, | ||
ThreadPoolStatusMonitor... threadPoolStatusMonitors) { | ||
return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, name, queueCapacity, | ||
Lists.newArrayList(threadPoolStatusMonitors)); | ||
} | ||
|
||
public static ThreadPoolExecutor createAndMonitor(int corePoolSize, | ||
int maximumPoolSize, | ||
long keepAliveTime, | ||
TimeUnit unit, | ||
String name, | ||
int queueCapacity, | ||
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) { | ||
ThreadPoolExecutor executor = new ThreadPoolExecutor( | ||
corePoolSize, | ||
maximumPoolSize, | ||
keepAliveTime, | ||
unit, | ||
new LinkedBlockingQueue<>(queueCapacity), | ||
new ThreadFactoryBuilder().setNameFormat(name + "-%d").build(), | ||
new ThreadPoolExecutor.DiscardOldestPolicy()); | ||
List<ThreadPoolStatusMonitor> printers = Lists.newArrayList(new ThreadPoolQueueSizeMonitor(queueCapacity)); | ||
printers.addAll(threadPoolStatusMonitors); | ||
|
||
MONITOR_EXECUTOR.add(ThreadPoolWrapper.builder() | ||
.name(name) | ||
.threadPoolExecutor(executor) | ||
.statusPrinters(printers) | ||
.build()); | ||
return executor; | ||
} | ||
|
||
public static void logThreadPoolStatus() { | ||
for (ThreadPoolWrapper threadPoolWrapper : MONITOR_EXECUTOR) { | ||
List<ThreadPoolStatusMonitor> monitors = threadPoolWrapper.getStatusPrinters(); | ||
for (ThreadPoolStatusMonitor monitor : monitors) { | ||
double value = monitor.value(threadPoolWrapper.getThreadPoolExecutor()); | ||
WatermarkLogger.info(threadPoolWrapper.getName(), | ||
monitor.describe(), | ||
value); | ||
|
||
if (enablePrintJstack) { | ||
if (monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) && | ||
System.currentTimeMillis() - jstackTime > jstackPeriodTIme) { | ||
jstackTime = System.currentTimeMillis(); | ||
ProxyLogger.LOG_JSTACK.warn("jstack start \n " + UtilAll.jstack()); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
public static void init() { | ||
MONITOR_SCHEDULED.scheduleAtFixedRate(ThreadPoolMonitor::logThreadPoolStatus, 20, 1, TimeUnit.SECONDS); | ||
} | ||
|
||
public static void shutdown() { | ||
MONITOR_SCHEDULED.shutdown(); | ||
} | ||
} |
44 changes: 44 additions & 0 deletions
44
common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolQueueSizeMonitor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.rocketmq.common.thread; | ||
|
||
import java.util.concurrent.ThreadPoolExecutor; | ||
|
||
public class ThreadPoolQueueSizeMonitor implements ThreadPoolStatusMonitor { | ||
|
||
private final int maxQueueCapacity; | ||
|
||
public ThreadPoolQueueSizeMonitor(int maxQueueCapacity) { | ||
this.maxQueueCapacity = maxQueueCapacity; | ||
} | ||
|
||
@Override | ||
public String describe() { | ||
return "queueSize"; | ||
} | ||
|
||
@Override | ||
public double value(ThreadPoolExecutor executor) { | ||
return executor.getQueue().size(); | ||
} | ||
|
||
@Override | ||
public boolean needPrintJstack(ThreadPoolExecutor executor, double value) { | ||
return value > maxQueueCapacity * 0.85; | ||
} | ||
} |
29 changes: 29 additions & 0 deletions
29
common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolStatusMonitor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.rocketmq.common.thread; | ||
|
||
import java.util.concurrent.ThreadPoolExecutor; | ||
|
||
public interface ThreadPoolStatusMonitor { | ||
|
||
String describe(); | ||
|
||
double value(ThreadPoolExecutor executor); | ||
|
||
boolean needPrintJstack(ThreadPoolExecutor executor, double value); | ||
} |
124 changes: 124 additions & 0 deletions
124
common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolWrapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.rocketmq.common.thread; | ||
|
||
import com.google.common.base.Objects; | ||
import java.util.List; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
|
||
public class ThreadPoolWrapper { | ||
private String name; | ||
private ThreadPoolExecutor threadPoolExecutor; | ||
private List<ThreadPoolStatusMonitor> statusPrinters; | ||
|
||
ThreadPoolWrapper(final String name, final ThreadPoolExecutor threadPoolExecutor, | ||
final List<ThreadPoolStatusMonitor> statusPrinters) { | ||
this.name = name; | ||
this.threadPoolExecutor = threadPoolExecutor; | ||
this.statusPrinters = statusPrinters; | ||
} | ||
|
||
public static class ThreadPoolWrapperBuilder { | ||
private String name; | ||
private ThreadPoolExecutor threadPoolExecutor; | ||
private List<ThreadPoolStatusMonitor> statusPrinters; | ||
|
||
ThreadPoolWrapperBuilder() { | ||
} | ||
|
||
public ThreadPoolWrapper.ThreadPoolWrapperBuilder name(final String name) { | ||
this.name = name; | ||
return this; | ||
} | ||
|
||
public ThreadPoolWrapper.ThreadPoolWrapperBuilder threadPoolExecutor( | ||
final ThreadPoolExecutor threadPoolExecutor) { | ||
this.threadPoolExecutor = threadPoolExecutor; | ||
return this; | ||
} | ||
|
||
public ThreadPoolWrapper.ThreadPoolWrapperBuilder statusPrinters( | ||
final List<ThreadPoolStatusMonitor> statusPrinters) { | ||
this.statusPrinters = statusPrinters; | ||
return this; | ||
} | ||
|
||
public ThreadPoolWrapper build() { | ||
return new ThreadPoolWrapper(this.name, this.threadPoolExecutor, this.statusPrinters); | ||
} | ||
|
||
@java.lang.Override | ||
public java.lang.String toString() { | ||
return "ThreadPoolWrapper.ThreadPoolWrapperBuilder(name=" + this.name + ", threadPoolExecutor=" + this.threadPoolExecutor + ", statusPrinters=" + this.statusPrinters + ")"; | ||
} | ||
} | ||
|
||
public static ThreadPoolWrapper.ThreadPoolWrapperBuilder builder() { | ||
return new ThreadPoolWrapper.ThreadPoolWrapperBuilder(); | ||
} | ||
|
||
public String getName() { | ||
return this.name; | ||
} | ||
|
||
public ThreadPoolExecutor getThreadPoolExecutor() { | ||
return this.threadPoolExecutor; | ||
} | ||
|
||
public List<ThreadPoolStatusMonitor> getStatusPrinters() { | ||
return this.statusPrinters; | ||
} | ||
|
||
public void setName(final String name) { | ||
this.name = name; | ||
} | ||
|
||
public void setThreadPoolExecutor(final ThreadPoolExecutor threadPoolExecutor) { | ||
this.threadPoolExecutor = threadPoolExecutor; | ||
} | ||
|
||
public void setStatusPrinters(final List<ThreadPoolStatusMonitor> statusPrinters) { | ||
this.statusPrinters = statusPrinters; | ||
} | ||
|
||
@Override public boolean equals(Object o) { | ||
if (this == o) | ||
return true; | ||
if (o == null || getClass() != o.getClass()) | ||
return false; | ||
ThreadPoolWrapper wrapper = (ThreadPoolWrapper) o; | ||
return Objects.equal(name, wrapper.name) && Objects.equal(threadPoolExecutor, wrapper.threadPoolExecutor) && Objects.equal(statusPrinters, wrapper.statusPrinters); | ||
} | ||
|
||
@Override public int hashCode() { | ||
return Objects.hashCode(name, threadPoolExecutor, statusPrinters); | ||
} | ||
|
||
@Override public String toString() { | ||
final StringBuilder sb = new StringBuilder("ThreadPoolWrapper{"); | ||
sb.append("name='") | ||
.append(name) | ||
.append('\''); | ||
sb.append(", threadPoolExecutor=") | ||
.append(threadPoolExecutor); | ||
sb.append(", statusPrinters=") | ||
.append(statusPrinters); | ||
sb.append('}'); | ||
return sb.toString(); | ||
} | ||
} |
Oops, something went wrong.