Skip to content

Commit

Permalink
rocksdb metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
fujian-zfj committed Jul 28, 2023
1 parent 3b0a27b commit 80e259e
Show file tree
Hide file tree
Showing 23 changed files with 1,813 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
Expand All @@ -66,8 +68,12 @@
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
Expand Down Expand Up @@ -120,6 +126,7 @@
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
Expand Down Expand Up @@ -301,9 +308,16 @@ public BrokerController(
this.messageStoreConfig = messageStoreConfig;
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
if (isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
} else {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
}
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.peekMessageProcessor = new PeekMessageProcessor(this);
Expand All @@ -324,7 +338,6 @@ public BrokerController(
this.popInflightMessageCounter = new PopInflightMessageCounter(this);
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.scheduleMessageService = new ScheduleMessageService(this);
this.coldDataPullRequestHoldService = new ColdDataPullRequestHoldService(this);
this.coldDataCgCtrService = new ColdDataCgCtrService(this);
Expand Down Expand Up @@ -1383,8 +1396,6 @@ protected void shutdownBasicService() {
this.adminBrokerExecutor.shutdown();
}

this.consumerOffsetManager.persist();

if (this.brokerFastFailure != null) {
this.brokerFastFailure.shutdown();
}
Expand Down Expand Up @@ -1449,8 +1460,20 @@ protected void shutdownBasicService() {
shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);

this.topicConfigManager.persist();
this.subscriptionGroupManager.persist();
if (this.topicConfigManager != null) {
this.topicConfigManager.persist();
this.topicConfigManager.stop();
}

if (this.subscriptionGroupManager != null) {
this.subscriptionGroupManager.persist();
this.subscriptionGroupManager.stop();
}

if (this.consumerOffsetManager != null) {
this.consumerOffsetManager.persist();
this.consumerOffsetManager.stop();
}

for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
if (brokerAttachedPlugin != null) {
Expand Down Expand Up @@ -2375,4 +2398,8 @@ public ColdDataCgCtrService getColdDataCgCtrService() {
public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}

public boolean isEnableRocksDBStore() {
return StoreType.DEFAULT_ROCKSDB.getStoreType().equalsIgnoreCase(this.messageStoreConfig.getStoreType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class ConsumerOffsetManager extends ConfigManager {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public static final String TOPIC_GROUP_SEPARATOR = "@";

private DataVersion dataVersion = new DataVersion();

private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<>(512);

private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable =
Expand All @@ -62,6 +62,10 @@ public ConsumerOffsetManager(BrokerController brokerController) {
this.brokerController = brokerController;
}

protected void removeConsumerOffset(String topicAtGroup) {

}

public void cleanOffset(String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;

import java.io.File;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.store.RocksDBOffsetSerializeWrapper;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.common.utils.DataConverter;
import org.rocksdb.WriteBatch;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval());
}

@Override
public boolean load() {
return this.rocksDBConfigManager.load(configFilePath(), this::decode0);
}

@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
}

@Override
protected void removeConsumerOffset(String topicAtGroup) {
try {
byte[] keyBytes = topicAtGroup.getBytes(DataConverter.charset);
this.rocksDBConfigManager.delete(keyBytes);
} catch (Exception e) {
LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup);
}
}

@Override
protected void decode0(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.charset);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable());
LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
}

@Override
public String configFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
}

@Override
public synchronized void persist() {
WriteBatch writeBatch = new WriteBatch();
try {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> iterator = this.offsetTable.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, ConcurrentMap<Integer, Long>> entry = iterator.next();
putWriteBatch(writeBatch, entry.getKey(), entry.getValue());

if (writeBatch.getDataSize() >= 4 * 1024) {
this.rocksDBConfigManager.batchPutWithWal(writeBatch);
}
}
this.rocksDBConfigManager.batchPutWithWal(writeBatch);
this.rocksDBConfigManager.flushWAL();
} catch (Exception e) {
LOG.error("consumer offset persist Failed", e);
} finally {
writeBatch.close();
}
}

private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap<Integer, Long> offsetMap) throws Exception {
byte[] keyBytes = topicGroupName.getBytes(DataConverter.charset);
RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
wrapper.setOffsetTable(offsetMap);
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
writeBatch.put(keyBytes, valueBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class RocksDBLmqConsumerOffsetManager extends RocksDBConsumerOffsetManager {
private ConcurrentHashMap<String, Long> lmqOffsetTable = new ConcurrentHashMap<>(512);

public RocksDBLmqConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
}

@Override
public long queryOffset(final String group, final String topic, final int queueId) {
if (!MixAll.isLmq(group)) {
return super.queryOffset(group, topic, queueId);
}
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
Long offset = lmqOffsetTable.get(key);
if (offset != null) {
return offset;
}
return -1;
}

@Override
public Map<Integer, Long> queryOffset(final String group, final String topic) {
if (!MixAll.isLmq(group)) {
return super.queryOffset(group, topic);
}
Map<Integer, Long> map = new HashMap<>();
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
Long offset = lmqOffsetTable.get(key);
if (offset != null) {
map.put(0, offset);
}
return map;
}

@Override
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
if (!MixAll.isLmq(group)) {
super.commitOffset(clientHost, group, topic, queueId, offset);
return;
}
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
lmqOffsetTable.put(key, offset);
}

@Override
public String encode() {
return this.encode(false);
}

@Override
public void decode(String jsonString) {
if (jsonString != null) {
RocksDBLmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, RocksDBLmqConsumerOffsetManager.class);
if (obj != null) {
super.setOffsetTable(obj.getOffsetTable());
this.lmqOffsetTable = obj.lmqOffsetTable;
}
}
}

@Override
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}

public ConcurrentHashMap<String, Long> getLmqOffsetTable() {
return lmqOffsetTable;
}

public void setLmqOffsetTable(ConcurrentHashMap<String, Long> lmqOffsetTable) {
this.lmqOffsetTable = lmqOffsetTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ScheduleMessageService(final BrokerController brokerController) {
this.brokerController = brokerController;
this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
scheduledPersistService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig()));
new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig()));
}

public static int queueId2DelayLevel(final int queueId) {
Expand Down Expand Up @@ -169,7 +169,7 @@ public void shutdown() {
ThreadUtils.shutdown(scheduledPersistService);
}

public void stop() {
public boolean stop() {
if (this.started.compareAndSet(true, false) && null != this.deliverExecutorService) {
this.deliverExecutorService.shutdown();
try {
Expand All @@ -193,6 +193,7 @@ public void stop() {

this.persist();
}
return true;
}

public boolean isStarted() {
Expand Down
Loading

0 comments on commit 80e259e

Please sign in to comment.