From fde0f8077083a11a86eccd6dfe7259e88bf8e8f2 Mon Sep 17 00:00:00 2001 From: Hankunming <1109939087@qq.com> Date: Thu, 27 Jul 2023 20:09:19 +0800 Subject: [PATCH] fix:consumer offset compatible problem caused by LmqConsumerOffsetManager --- .../offset/LmqConsumerOffsetManager.java | 90 +++++++++++++++++++ .../apache/rocketmq/common/ConfigManager.java | 2 +- 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index ce70b1a820f..5f1ad578a26 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -23,9 +23,13 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class LmqConsumerOffsetManager extends ConsumerOffsetManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private ConcurrentHashMap lmqOffsetTable = new ConcurrentHashMap<>(512); public LmqConsumerOffsetManager() { @@ -36,6 +40,92 @@ public LmqConsumerOffsetManager(BrokerController brokerController) { super(brokerController); } + @Override + public boolean load() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName); + + if (null == jsonString || jsonString.length() == 0) { + if (!this.loadBak()) { + return loadOldVersionConsumerOffset(); + } + } else { + this.decode(jsonString); + log.info("load " + fileName + " OK"); + return true; + } + } catch (Exception e) { + log.error("load " + fileName + " failed, and try to load backup file", e); + return this.loadBak(); + } + return true; + } + + @Override + protected boolean loadBak() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName + ".bak"); + if (jsonString != null && jsonString.length() > 0) { + this.decode(jsonString); + log.info("load " + fileName + " OK"); + return true; + } + } catch (Exception e) { + log.error("load " + fileName + " Failed", e); + return false; + } + return false; + } + + private boolean loadOldVersionConsumerOffset() { + String fileName = null; + try { + fileName = super.configFilePath(); + String jsonString = MixAll.file2String(fileName); + + if (null == jsonString || jsonString.length() == 0) { + return this.loadOldBak(); + } else { + this.decodeOldVersion(jsonString); + log.info("load old version " + fileName + " OK"); + return true; + } + } catch (Exception e) { + log.error("load old version" + fileName + " failed, and try to load old backup file", e); + return this.loadOldBak(); + } + } + + private boolean loadOldBak() { + String fileName = null; + try { + fileName = super.configFilePath(); + String jsonString = MixAll.file2String(fileName + ".bak"); + if (jsonString != null && jsonString.length() > 0) { + this.decodeOldVersion(jsonString); + log.info("load old bak" + fileName + " OK"); + return true; + } + } catch (Exception e) { + log.error("load old bak" + fileName + " Failed", e); + return false; + } + return true; + } + + private void decodeOldVersion(String jsonString) { + if (jsonString != null) { + ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); + if (obj != null) { + super.setOffsetTable(obj.getOffsetTable()); + } + } + } + @Override public long queryOffset(final String group, final String topic, final int queueId) { if (!MixAll.isLmq(group)) { diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index f712e1694d8..de893fddd86 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -48,7 +48,7 @@ public boolean load() { public abstract String configFilePath(); - private boolean loadBak() { + protected boolean loadBak() { String fileName = null; try { fileName = this.configFilePath();