From 7a405a6e87bbcaa160344c81fd24313ee23f3e4f Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Fri, 28 Jul 2023 17:47:33 +0800 Subject: [PATCH] Avoid dispatch tasks too much cause dispatch task failed --- .../apache/rocketmq/tieredstore/TieredDispatcher.java | 9 +++++---- .../rocketmq/tieredstore/common/TieredStoreExecutor.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 523b0c2cde3..bc8b944015c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -180,10 +180,6 @@ public void dispatch(DispatchRequest request) { message.release(); flatFile.getCompositeFlatFileLock().unlock(); } - } else { - if (!flatFile.getCompositeFlatFileLock().isLocked()) { - this.dispatchFlatFileAsync(flatFile); - } } } @@ -199,6 +195,11 @@ public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile) { } public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile, Consumer consumer) { + // Avoid dispatch tasks too much + if (TieredStoreExecutor.dispatchThreadPoolQueue.size() > + TieredStoreExecutor.QUEUE_CAPACITY * 0.75) { + return; + } TieredStoreExecutor.dispatchExecutor.execute(() -> { try { dispatchFlatFile(flatFile); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java index 23f1b01eacd..6eb3478b3d9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java @@ -27,7 +27,7 @@ public class TieredStoreExecutor { - private static final int QUEUE_CAPACITY = 10000; + public static final int QUEUE_CAPACITY = 10000; // Visible for monitor public static BlockingQueue dispatchThreadPoolQueue;