Skip to content

Commit

Permalink
fix:consumer offset compatible problem caused by LmqConsumerOffsetMan…
Browse files Browse the repository at this point in the history
…ager
  • Loading branch information
humkum committed Jul 28, 2023
1 parent d797377 commit fde0f80
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class LmqConsumerOffsetManager extends ConsumerOffsetManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private ConcurrentHashMap<String, Long> lmqOffsetTable = new ConcurrentHashMap<>(512);

public LmqConsumerOffsetManager() {
Expand All @@ -36,6 +40,92 @@ public LmqConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
}

@Override
public boolean load() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);

if (null == jsonString || jsonString.length() == 0) {
if (!this.loadBak()) {
return loadOldVersionConsumerOffset();
}
} else {
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load " + fileName + " failed, and try to load backup file", e);
return this.loadBak();
}
return true;
}

@Override
protected boolean loadBak() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName + ".bak");
if (jsonString != null && jsonString.length() > 0) {
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load " + fileName + " Failed", e);
return false;
}
return false;
}

private boolean loadOldVersionConsumerOffset() {
String fileName = null;
try {
fileName = super.configFilePath();
String jsonString = MixAll.file2String(fileName);

if (null == jsonString || jsonString.length() == 0) {
return this.loadOldBak();
} else {
this.decodeOldVersion(jsonString);
log.info("load old version " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load old version" + fileName + " failed, and try to load old backup file", e);
return this.loadOldBak();
}
}

private boolean loadOldBak() {
String fileName = null;
try {
fileName = super.configFilePath();
String jsonString = MixAll.file2String(fileName + ".bak");
if (jsonString != null && jsonString.length() > 0) {
this.decodeOldVersion(jsonString);
log.info("load old bak" + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load old bak" + fileName + " Failed", e);
return false;
}
return true;
}

private void decodeOldVersion(String jsonString) {
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
super.setOffsetTable(obj.getOffsetTable());
}
}
}

@Override
public long queryOffset(final String group, final String topic, final int queueId) {
if (!MixAll.isLmq(group)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public boolean load() {

public abstract String configFilePath();

private boolean loadBak() {
protected boolean loadBak() {
String fileName = null;
try {
fileName = this.configFilePath();
Expand Down

0 comments on commit fde0f80

Please sign in to comment.