diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 972457194fe..30b1d2299a5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -45,6 +45,8 @@ import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager; import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager; +import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager; +import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.processor.AckMessageProcessor; @@ -66,8 +68,12 @@ import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager; +import org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager; +import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.LmqTopicConfigManager; +import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager; +import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService; import org.apache.rocketmq.broker.topic.TopicQueueMappingManager; @@ -120,6 +126,7 @@ import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.StoreType; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.dledger.DLedgerCommitLog; @@ -301,9 +308,16 @@ public BrokerController( this.messageStoreConfig = messageStoreConfig; this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort())); this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()); - this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this); this.broadcastOffsetManager = new BroadcastOffsetManager(this); - this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this); + if (isEnableRocksDBStore()) { + this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this); + this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this); + this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this); + } else { + this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this); + this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this); + this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this); + } this.topicQueueMappingManager = new TopicQueueMappingManager(this); this.pullMessageProcessor = new PullMessageProcessor(this); this.peekMessageProcessor = new PeekMessageProcessor(this); @@ -324,7 +338,6 @@ public BrokerController( this.popInflightMessageCounter = new PopInflightMessageCounter(this); this.clientHousekeepingService = new ClientHousekeepingService(this); this.broker2Client = new Broker2Client(this); - this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this); this.scheduleMessageService = new ScheduleMessageService(this); this.coldDataPullRequestHoldService = new ColdDataPullRequestHoldService(this); this.coldDataCgCtrService = new ColdDataCgCtrService(this); @@ -1383,8 +1396,6 @@ protected void shutdownBasicService() { this.adminBrokerExecutor.shutdown(); } - this.consumerOffsetManager.persist(); - if (this.brokerFastFailure != null) { this.brokerFastFailure.shutdown(); } @@ -1449,8 +1460,20 @@ protected void shutdownBasicService() { shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService); shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService); - this.topicConfigManager.persist(); - this.subscriptionGroupManager.persist(); + if (this.topicConfigManager != null) { + this.topicConfigManager.persist(); + this.topicConfigManager.stop(); + } + + if (this.subscriptionGroupManager != null) { + this.subscriptionGroupManager.persist(); + this.subscriptionGroupManager.stop(); + } + + if (this.consumerOffsetManager != null) { + this.consumerOffsetManager.persist(); + this.consumerOffsetManager.stop(); + } for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) { if (brokerAttachedPlugin != null) { @@ -2375,4 +2398,8 @@ public ColdDataCgCtrService getColdDataCgCtrService() { public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) { this.coldDataCgCtrService = coldDataCgCtrService; } + + public boolean isEnableRocksDBStore() { + return StoreType.DEFAULT_ROCKSDB.getStoreType().equalsIgnoreCase(this.messageStoreConfig.getStoreType()); + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 8bf4e9a5994..25685893871 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -37,12 +37,12 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class ConsumerOffsetManager extends ConfigManager { - private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); public static final String TOPIC_GROUP_SEPARATOR = "@"; private DataVersion dataVersion = new DataVersion(); - private ConcurrentMap> offsetTable = + protected ConcurrentMap> offsetTable = new ConcurrentHashMap<>(512); private final ConcurrentMap> resetOffsetTable = @@ -62,6 +62,10 @@ public ConsumerOffsetManager(BrokerController brokerController) { this.brokerController = brokerController; } + protected void removeConsumerOffset(String topicAtGroup) { + + } + public void cleanOffset(String group) { Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java new file mode 100644 index 00000000000..8a2d91061d7 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.offset; + +import java.io.File; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.consumer.store.RocksDBOffsetSerializeWrapper; +import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.common.utils.DataConverter; +import org.rocksdb.WriteBatch; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; + +public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { + + public RocksDBConsumerOffsetManager(BrokerController brokerController) { + super(brokerController); + this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval()); + } + + @Override + public boolean load() { + return this.rocksDBConfigManager.load(configFilePath(), this::decode0); + } + + @Override + public boolean stop() { + return this.rocksDBConfigManager.stop(); + } + + @Override + protected void removeConsumerOffset(String topicAtGroup) { + try { + byte[] keyBytes = topicAtGroup.getBytes(DataConverter.charset); + this.rocksDBConfigManager.delete(keyBytes); + } catch (Exception e) { + LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup); + } + } + + @Override + protected void decode0(final byte[] key, final byte[] body) { + String topicAtGroup = new String(key, DataConverter.charset); + RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class); + + this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable()); + LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable()); + } + + @Override + public String configFilePath() { + return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator; + } + + @Override + public synchronized void persist() { + WriteBatch writeBatch = new WriteBatch(); + try { + Iterator>> iterator = this.offsetTable.entrySet().iterator(); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + putWriteBatch(writeBatch, entry.getKey(), entry.getValue()); + + if (writeBatch.getDataSize() >= 4 * 1024) { + this.rocksDBConfigManager.batchPutWithWal(writeBatch); + } + } + this.rocksDBConfigManager.batchPutWithWal(writeBatch); + this.rocksDBConfigManager.flushWAL(); + } catch (Exception e) { + LOG.error("consumer offset persist Failed", e); + } finally { + writeBatch.close(); + } + } + + private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap offsetMap) throws Exception { + byte[] keyBytes = topicGroupName.getBytes(DataConverter.charset); + RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper(); + wrapper.setOffsetTable(offsetMap); + byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible); + writeBatch.put(keyBytes, valueBytes); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java new file mode 100644 index 00000000000..d0faa661406 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.offset; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class RocksDBLmqConsumerOffsetManager extends RocksDBConsumerOffsetManager { + private ConcurrentHashMap lmqOffsetTable = new ConcurrentHashMap<>(512); + + public RocksDBLmqConsumerOffsetManager(BrokerController brokerController) { + super(brokerController); + } + + @Override + public long queryOffset(final String group, final String topic, final int queueId) { + if (!MixAll.isLmq(group)) { + return super.queryOffset(group, topic, queueId); + } + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + Long offset = lmqOffsetTable.get(key); + if (offset != null) { + return offset; + } + return -1; + } + + @Override + public Map queryOffset(final String group, final String topic) { + if (!MixAll.isLmq(group)) { + return super.queryOffset(group, topic); + } + Map map = new HashMap<>(); + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + Long offset = lmqOffsetTable.get(key); + if (offset != null) { + map.put(0, offset); + } + return map; + } + + @Override + public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, + final long offset) { + if (!MixAll.isLmq(group)) { + super.commitOffset(clientHost, group, topic, queueId, offset); + return; + } + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + lmqOffsetTable.put(key, offset); + } + + @Override + public String encode() { + return this.encode(false); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + RocksDBLmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, RocksDBLmqConsumerOffsetManager.class); + if (obj != null) { + super.setOffsetTable(obj.getOffsetTable()); + this.lmqOffsetTable = obj.lmqOffsetTable; + } + } + } + + @Override + public String encode(final boolean prettyFormat) { + return RemotingSerializable.toJson(this, prettyFormat); + } + + public ConcurrentHashMap getLmqOffsetTable() { + return lmqOffsetTable; + } + + public void setLmqOffsetTable(ConcurrentHashMap lmqOffsetTable) { + this.lmqOffsetTable = lmqOffsetTable; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index 2a4ace09850..646d0c202f9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -92,7 +92,7 @@ public ScheduleMessageService(final BrokerController brokerController) { this.brokerController = brokerController; this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver(); scheduledPersistService = new ScheduledThreadPoolExecutor(1, - new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig())); + new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig())); } public static int queueId2DelayLevel(final int queueId) { @@ -169,7 +169,7 @@ public void shutdown() { ThreadUtils.shutdown(scheduledPersistService); } - public void stop() { + public boolean stop() { if (this.started.compareAndSet(true, false) && null != this.deliverExecutorService) { this.deliverExecutorService.shutdown(); try { @@ -193,6 +193,7 @@ public void stop() { this.persist(); } + return true; } public boolean isStarted() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java new file mode 100644 index 00000000000..8c05d0bd98f --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.subscription; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +public class RocksDBLmqSubscriptionGroupManager extends RocksDBSubscriptionGroupManager { + + public RocksDBLmqSubscriptionGroupManager(BrokerController brokerController) { + super(brokerController); + } + + @Override + public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { + if (MixAll.isLmq(group)) { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(group); + return subscriptionGroupConfig; + } + return super.findSubscriptionGroupConfig(group); + } + + @Override + public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { + if (config == null || MixAll.isLmq(config.getGroupName())) { + return; + } + super.updateSubscriptionGroupConfig(config); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java new file mode 100644 index 00000000000..92f1ed3232b --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.subscription; + +import java.io.File; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; + +public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { + + public RocksDBSubscriptionGroupManager(BrokerController brokerController) { + super(brokerController); + this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval()); + } + + @Override + public boolean load() { + if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) { + return false; + } + this.init(); + return true; + } + + @Override + public boolean stop() { + return this.rocksDBConfigManager.stop(); + } + + @Override + protected SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) { + String groupName = subscriptionGroupConfig.getGroupName(); + SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig); + + try { + byte[] keyBytes = groupName.getBytes(DataConverter.charset); + byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible); + this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes); + } catch (Exception e) { + log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString()); + } + return oldConfig; + } + + @Override + protected SubscriptionGroupConfig putSubscriptionGroupConfigIfAbsent(SubscriptionGroupConfig subscriptionGroupConfig) { + String groupName = subscriptionGroupConfig.getGroupName(); + SubscriptionGroupConfig oldConfig = this.subscriptionGroupTable.putIfAbsent(groupName, subscriptionGroupConfig); + if (oldConfig == null) { + try { + byte[] keyBytes = groupName.getBytes(DataConverter.charset); + byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible); + this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes); + } catch (Exception e) { + log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString()); + } + } + return oldConfig; + } + + @Override + protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) { + SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.remove(groupName); + try { + this.rocksDBConfigManager.delete(groupName.getBytes(DataConverter.charset)); + } catch (Exception e) { + log.error("kv delete sub Failed, {}", subscriptionGroupConfig.toString()); + } + return subscriptionGroupConfig; + } + + @Override + protected void decode0(byte[] key, byte[] body) { + String groupName = new String(key, DataConverter.charset); + SubscriptionGroupConfig subscriptionGroupConfig = JSON.parseObject(body, SubscriptionGroupConfig.class); + + this.subscriptionGroupTable.put(groupName, subscriptionGroupConfig); + log.info("load exist local sub, {}", subscriptionGroupConfig.toString()); + } + + @Override + public synchronized void persist() { + if (brokerController.getMessageStoreConfig().isRealTimePersistRocksDBConfig()) { + this.rocksDBConfigManager.flushWAL(); + } + } + + @Override + public String configFilePath() { + return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index 0ae11313f20..f404afad495 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -40,16 +40,16 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; public class SubscriptionGroupManager extends ConfigManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private ConcurrentMap subscriptionGroupTable = + protected ConcurrentMap subscriptionGroupTable = new ConcurrentHashMap<>(1024); private ConcurrentMap> forbiddenTable = new ConcurrentHashMap<>(4); private final DataVersion dataVersion = new DataVersion(); - private transient BrokerController brokerController; + protected transient BrokerController brokerController; public SubscriptionGroupManager() { this.init(); @@ -57,64 +57,86 @@ public SubscriptionGroupManager() { public SubscriptionGroupManager(BrokerController brokerController) { this.brokerController = brokerController; + } + + @Override + public boolean load() { + super.load(); this.init(); + return true; } - private void init() { + protected void init() { { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP); - this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(MixAll.CID_SYS_RMQ_TRANS); subscriptionGroupConfig.setConsumeBroadcastEnable(true); - this.subscriptionGroupTable.put(MixAll.CID_SYS_RMQ_TRANS, subscriptionGroupConfig); + putSubscriptionGroupConfig(subscriptionGroupConfig); } } + protected SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) { + return this.subscriptionGroupTable.put(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig); + } + + protected SubscriptionGroupConfig putSubscriptionGroupConfigIfAbsent(SubscriptionGroupConfig subscriptionGroupConfig) { + return this.subscriptionGroupTable.putIfAbsent(subscriptionGroupConfig.getGroupName(), subscriptionGroupConfig); + } + + protected SubscriptionGroupConfig getSubscriptionGroupConfig(String groupName) { + return this.subscriptionGroupTable.get(groupName); + } + + protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) { + return this.subscriptionGroupTable.remove(groupName); + } + public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { Map newAttributes = request(config); Map currentAttributes = current(config.getGroupName()); @@ -127,7 +149,7 @@ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) config.setAttributes(finalAttributes); - SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config); + SubscriptionGroupConfig old = putSubscriptionGroupConfig(config); if (old != null) { log.info("update subscription group config, old: {} new: {}", old, config); } else { @@ -218,7 +240,7 @@ private void updateForbiddenValue(String group, String topic, Integer forbidden) } public void disableConsume(final String groupName) { - SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName); + SubscriptionGroupConfig old = getSubscriptionGroupConfig(groupName); if (old != null) { old.setConsumeEnable(false); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; @@ -227,7 +249,7 @@ public void disableConsume(final String groupName) { } public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { - SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group); + SubscriptionGroupConfig subscriptionGroupConfig = getSubscriptionGroupConfig(group); if (null == subscriptionGroupConfig) { if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) { if (group.length() > Validators.CHARACTER_MAX_LENGTH || TopicValidator.isTopicOrGroupIllegal(group)) { @@ -235,7 +257,7 @@ public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { } subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(group); - SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig); + SubscriptionGroupConfig preConfig = putSubscriptionGroupConfigIfAbsent(subscriptionGroupConfig); if (null == preConfig) { log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString()); } @@ -305,7 +327,7 @@ public DataVersion getDataVersion() { } public void deleteSubscriptionGroupConfig(final String groupName) { - SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName); + SubscriptionGroupConfig old = removeSubscriptionGroupConfig(groupName); this.forbiddenTable.remove(groupName); if (old != null) { log.info("delete subscription group OK, subscription group:{}", old); @@ -317,8 +339,12 @@ public void deleteSubscriptionGroupConfig(final String groupName) { } } + public void setSubscriptionGroupTable(ConcurrentMap subscriptionGroupTable) { - this.subscriptionGroupTable = subscriptionGroupTable; + this.subscriptionGroupTable.clear(); + for (String key : subscriptionGroupTable.keySet()) { + putSubscriptionGroupConfig(subscriptionGroupTable.get(key)); + } } public boolean containsSubscriptionGroup(String group) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java new file mode 100644 index 00000000000..d049a8dbcde --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.constant.PermName; + +public class RocksDBLmqTopicConfigManager extends RocksDBTopicConfigManager { + + public RocksDBLmqTopicConfigManager(BrokerController brokerController) { + super(brokerController); + } + + @Override + public TopicConfig selectTopicConfig(final String topic) { + if (MixAll.isLmq(topic)) { + return simpleLmqTopicConfig(topic); + } + return super.selectTopicConfig(topic); + } + + @Override + public void updateTopicConfig(final TopicConfig topicConfig) { + if (topicConfig == null || MixAll.isLmq(topicConfig.getTopicName())) { + return; + } + super.updateTopicConfig(topicConfig); + } + + @Override + public boolean containsTopic(String topic) { + if (MixAll.isLmq(topic)) { + return true; + } + return super.containsTopic(topic); + } + + private TopicConfig simpleLmqTopicConfig(String topic) { + return new TopicConfig(topic, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java new file mode 100644 index 00000000000..cb31a802af9 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.topic; + +import java.io.File; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.common.utils.DataConverter; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; + +public class RocksDBTopicConfigManager extends TopicConfigManager { + + public RocksDBTopicConfigManager(BrokerController brokerController) { + super(brokerController); + this.rocksDBConfigManager = new RocksDBConfigManager(this.brokerController.getMessageStoreConfig().getMemTableFlushInterval()); + } + + @Override + public boolean load() { + if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) { + return false; + } + this.init(); + return true; + } + + @Override + public boolean stop() { + return this.rocksDBConfigManager.stop(); + } + + @Override + protected void decode0(byte[] key, byte[] body) { + String topicName = new String(key, DataConverter.charset); + TopicConfig topicConfig = JSON.parseObject(body, TopicConfig.class); + + this.topicConfigTable.put(topicName, topicConfig); + log.info("load exist local topic, {}", topicConfig.toString()); + } + + @Override + public String configFilePath() { + return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "topics" + File.separator; + } + + @Override + protected TopicConfig putTopicConfig(TopicConfig topicConfig) { + String topicName = topicConfig.getTopicName(); + TopicConfig oldTopicConfig = this.topicConfigTable.put(topicName, topicConfig); + try { + byte[] keyBytes = topicName.getBytes(DataConverter.charset); + byte[] valueBytes = JSON.toJSONBytes(topicConfig, SerializerFeature.BrowserCompatible); + this.rocksDBConfigManager.put(keyBytes, keyBytes.length, valueBytes); + } catch (Exception e) { + log.error("kv put topic Failed, {}", topicConfig.toString(), e); + } + return oldTopicConfig; + } + + @Override + protected TopicConfig removeTopicConfig(String topicName) { + TopicConfig topicConfig = this.topicConfigTable.remove(topicName); + try { + this.rocksDBConfigManager.delete(topicName.getBytes(DataConverter.charset)); + } catch (Exception e) { + log.error("kv remove topic Failed, {}", topicConfig.toString()); + } + return topicConfig; + } + + @Override + public synchronized void persist() { + if (brokerController.getMessageStoreConfig().isRealTimePersistRocksDBConfig()) { + this.rocksDBConfigManager.flushWAL(); + } + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index e9053051293..d9012bc03e5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.broker.topic; -import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -27,6 +26,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.collect.ImmutableMap; + import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; @@ -50,27 +52,38 @@ import static com.google.common.base.Preconditions.checkNotNull; public class TopicConfigManager extends ConfigManager { - private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18; private transient final Lock topicConfigTableLock = new ReentrantLock(); - private ConcurrentMap topicConfigTable = new ConcurrentHashMap<>(1024); + protected ConcurrentMap topicConfigTable = new ConcurrentHashMap<>(1024); private DataVersion dataVersion = new DataVersion(); - private transient BrokerController brokerController; + protected transient BrokerController brokerController; public TopicConfigManager() { + } public TopicConfigManager(BrokerController brokerController) { this.brokerController = brokerController; + } + + @Override + public boolean load() { + super.load(); + init(); + return true; + } + + protected void init() { { String topic = TopicValidator.RMQ_SYS_SELF_TEST_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { @@ -83,7 +96,7 @@ public TopicConfigManager(BrokerController brokerController) { .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } } { @@ -92,7 +105,7 @@ public TopicConfigManager(BrokerController brokerController) { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1024); topicConfig.setWriteQueueNums(1024); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { String topic = this.brokerController.getBrokerConfig().getBrokerClusterName(); @@ -103,7 +116,7 @@ public TopicConfigManager(BrokerController brokerController) { perm |= PermName.PERM_READ | PermName.PERM_WRITE; } topicConfig.setPerm(perm); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { @@ -117,7 +130,7 @@ public TopicConfigManager(BrokerController brokerController) { topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); topicConfig.setPerm(perm); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT; @@ -125,7 +138,7 @@ public TopicConfigManager(BrokerController brokerController) { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; @@ -133,7 +146,7 @@ public TopicConfigManager(BrokerController brokerController) { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { @@ -142,7 +155,7 @@ public TopicConfigManager(BrokerController brokerController) { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } } { @@ -151,7 +164,7 @@ public TopicConfigManager(BrokerController brokerController) { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { // PopAckConstants.REVIVE_TOPIC @@ -160,7 +173,7 @@ public TopicConfigManager(BrokerController brokerController) { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getReviveQueueNum()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getReviveQueueNum()); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { // sync broker member group topic @@ -170,7 +183,7 @@ public TopicConfigManager(BrokerController brokerController) { topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); topicConfig.setPerm(PermName.PERM_INHERIT); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { // TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC @@ -179,7 +192,7 @@ public TopicConfigManager(BrokerController brokerController) { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } { @@ -189,12 +202,24 @@ public TopicConfigManager(BrokerController brokerController) { TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); } } + protected TopicConfig putTopicConfig(TopicConfig topicConfig) { + return this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + } + + protected TopicConfig getTopicConfig(String topicName) { + return this.topicConfigTable.get(topicName); + } + + protected TopicConfig removeTopicConfig(String topicName) { + return this.topicConfigTable.remove(topicName); + } + public TopicConfig selectTopicConfig(final String topic) { - return this.topicConfigTable.get(topic); + return getTopicConfig(topic); } public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, @@ -205,12 +230,12 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri try { if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - topicConfig = this.topicConfigTable.get(topic); + topicConfig = getTopicConfig(topic); if (topicConfig != null) { return topicConfig; } - TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic); + TopicConfig defaultTopicConfig = getTopicConfig(defaultTopic); if (defaultTopicConfig != null) { if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) { if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { @@ -247,7 +272,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress); - this.topicConfigTable.put(topic, topicConfig); + putTopicConfig(topicConfig); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -287,12 +312,12 @@ public TopicConfig createTopicIfAbsent(TopicConfig topicConfig, boolean register try { if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - TopicConfig existedTopicConfig = this.topicConfigTable.get(topicConfig.getTopicName()); + TopicConfig existedTopicConfig = getTopicConfig(topicConfig.getTopicName()); if (existedTopicConfig != null) { return existedTopicConfig; } log.info("Create new topic [{}] config:[{}]", topicConfig.getTopicName(), topicConfig); - this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + putTopicConfig(topicConfig); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); createNew = true; @@ -305,13 +330,9 @@ public TopicConfig createTopicIfAbsent(TopicConfig topicConfig, boolean register log.error("createTopicIfAbsent ", e); } if (createNew && register) { - if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) { - this.brokerController.registerSingleTopicAll(topicConfig); - } else { - this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); - } + this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion); } - return this.topicConfigTable.get(topicConfig.getTopicName()); + return getTopicConfig(topicConfig.getTopicName()); } public TopicConfig createTopicInSendMessageBackMethod( @@ -328,7 +349,7 @@ public TopicConfig createTopicInSendMessageBackMethod( final int perm, final boolean isOrder, final int topicSysFlag) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig != null) { if (isOrder != topicConfig.isOrder()) { topicConfig.setOrder(isOrder); @@ -342,7 +363,7 @@ public TopicConfig createTopicInSendMessageBackMethod( try { if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - topicConfig = this.topicConfigTable.get(topic); + topicConfig = getTopicConfig(topic); if (topicConfig != null) { return topicConfig; } @@ -355,7 +376,7 @@ public TopicConfig createTopicInSendMessageBackMethod( topicConfig.setOrder(isOrder); log.info("create new topic {}", topicConfig); - this.topicConfigTable.put(topic, topicConfig); + putTopicConfig(topicConfig); createNew = true; long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -376,7 +397,7 @@ public TopicConfig createTopicInSendMessageBackMethod( } public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQueueNums, final int perm) { - TopicConfig topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); + TopicConfig topicConfig = getTopicConfig(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); if (topicConfig != null) return topicConfig; @@ -385,7 +406,7 @@ public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQue try { if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); + topicConfig = getTopicConfig(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC); if (topicConfig != null) return topicConfig; @@ -396,7 +417,7 @@ public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQue topicConfig.setTopicSysFlag(0); log.info("create new topic {}", topicConfig); - this.topicConfigTable.put(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC, topicConfig); + putTopicConfig(topicConfig); createNew = true; long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -418,7 +439,7 @@ public TopicConfig createTopicOfTranCheckMaxTime(final int clientDefaultTopicQue public void updateTopicUnitFlag(final String topic, final boolean unit) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig != null) { int oldTopicSysFlag = topicConfig.getTopicSysFlag(); if (unit) { @@ -430,7 +451,7 @@ public void updateTopicUnitFlag(final String topic, final boolean unit) { log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag={}", oldTopicSysFlag, topicConfig.getTopicSysFlag()); - this.topicConfigTable.put(topic, topicConfig); + putTopicConfig(topicConfig); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -441,7 +462,7 @@ public void updateTopicUnitFlag(final String topic, final boolean unit) { } public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig != null) { int oldTopicSysFlag = topicConfig.getTopicSysFlag(); if (hasUnitSub) { @@ -453,7 +474,7 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag={}", oldTopicSysFlag, topicConfig.getTopicSysFlag()); - this.topicConfigTable.put(topic, topicConfig); + putTopicConfig(topicConfig); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; dataVersion.nextVersion(stateMachineVersion); @@ -469,6 +490,7 @@ public void updateTopicConfig(final TopicConfig topicConfig) { Map newAttributes = request(topicConfig); Map currentAttributes = current(topicConfig.getTopicName()); + Map finalAttributes = AttributeUtil.alterCurrentAttributes( this.topicConfigTable.get(topicConfig.getTopicName()) == null, TopicAttributes.ALL, @@ -477,7 +499,7 @@ public void updateTopicConfig(final TopicConfig topicConfig) { topicConfig.setAttributes(finalAttributes); - TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); + TopicConfig old = putTopicConfig(topicConfig); if (old != null) { log.info("update topic config, old:[{}] new:[{}]", old, topicConfig); } else { @@ -496,7 +518,7 @@ public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { boolean isChange = false; Set orderTopics = orderKVTableFromNs.getTable().keySet(); for (String topic : orderTopics) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig != null && !topicConfig.isOrder()) { topicConfig.setOrder(true); isChange = true; @@ -534,7 +556,7 @@ public Map allAttributes() { } public boolean isOrderTopic(final String topic) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig == null) { return false; } else { @@ -543,7 +565,7 @@ public boolean isOrderTopic(final String topic) { } public void deleteTopicConfig(final String topic) { - TopicConfig old = this.topicConfigTable.remove(topic); + TopicConfig old = removeTopicConfig(topic); if (old != null) { log.info("delete topic config OK, topic: {}", old); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; @@ -619,7 +641,7 @@ private Map request(TopicConfig topicConfig) { } private Map current(String topic) { - TopicConfig topicConfig = this.topicConfigTable.get(topic); + TopicConfig topicConfig = getTopicConfig(topic); if (topicConfig == null) { return new HashMap<>(); } else { diff --git a/broker/src/main/resources/rmq.broker.logback.xml b/broker/src/main/resources/rmq.broker.logback.xml index 78b1aea4110..7d49f666426 100644 --- a/broker/src/main/resources/rmq.broker.logback.xml +++ b/broker/src/main/resources/rmq.broker.logback.xml @@ -145,6 +145,39 @@ + + + brokerContainerLogDir + ${file.separator} + + + + + ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}rocksdb.log + + true + + + ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}rocksdb.%i.log.gz + + 1 + 10 + + + 128MB + + + %d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n + UTF-8 + + + + + + + + brokerContainerLogDir @@ -579,6 +612,10 @@ + + + + diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RocksDBOffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RocksDBOffsetSerializeWrapper.java new file mode 100644 index 00000000000..04cf2f6cd1b --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RocksDBOffsetSerializeWrapper.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.store; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class RocksDBOffsetSerializeWrapper extends RemotingSerializable { + private ConcurrentMap offsetTable = new ConcurrentHashMap(16); + + public ConcurrentMap getOffsetTable() { + return offsetTable; + } + + public void setOffsetTable(ConcurrentMap offsetTable) { + this.offsetTable = offsetTable; + } +} diff --git a/common/pom.xml b/common/pom.xml index 9796d1b2dd6..31eb0f087da 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -104,5 +104,9 @@ io.github.aliyunmq rocketmq-logback-classic + + io.github.aliyunmq + rocketmq-rocksdb + diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index f712e1694d8..6c3bed47cf3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.Map; + +import org.apache.rocketmq.common.config.RocksDBConfigManager; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -25,7 +27,7 @@ public abstract class ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); - public abstract String encode(); + protected RocksDBConfigManager rocksDBConfigManager; public boolean load() { String fileName = null; @@ -46,8 +48,6 @@ public boolean load() { } } - public abstract String configFilePath(); - private boolean loadBak() { String fileName = null; try { @@ -66,8 +66,6 @@ private boolean loadBak() { return true; } - public abstract void decode(final String jsonString); - public synchronized void persist(String topicName, T t) { // stub for future this.persist(); @@ -90,5 +88,19 @@ public synchronized void persist() { } } + protected void decode0(final byte[] key, final byte[] body) { + + } + + public boolean stop() { + return true; + } + + public abstract String configFilePath(); + + public abstract String encode(); + public abstract String encode(final boolean prettyFormat); + + public abstract void decode(final String jsonString); } diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java new file mode 100644 index 00000000000..e3673baad05 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.config; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactRangeOptions; +import org.rocksdb.CompactionOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.FlushOptions; +import org.rocksdb.LiveFileMetaData; +import org.rocksdb.Priority; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Statistics; +import org.rocksdb.Status; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import static org.rocksdb.RocksDB.NOT_FOUND; + +public abstract class AbstractRocksDBStorage { + protected static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME); + + private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8"); + private static final String SPACE = " | "; + + protected String dbPath; + protected boolean readOnly; + protected RocksDB db; + protected DBOptions options; + + protected WriteOptions writeOptions; + protected WriteOptions ableWalWriteOptions; + + protected ReadOptions readOptions; + protected ReadOptions totalOrderReadOptions; + + protected CompactionOptions compactionOptions; + protected CompactRangeOptions compactRangeOptions; + + protected ColumnFamilyHandle defaultCFHandle; + protected final List cfOptions = new ArrayList(); + + protected volatile boolean loaded; + private volatile boolean closed; + + private final Semaphore reloadPermit = new Semaphore(1); + private final ScheduledExecutorService reloadScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("RocksDBStorageReloadService_")); + private final ThreadPoolExecutor manualCompactionThread = new ThreadPoolExecutor( + 1, 1, 1000 * 60, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(1), + new ThreadFactoryImpl("RocksDBManualCompactionService_"), + new ThreadPoolExecutor.DiscardOldestPolicy()); + + static { + RocksDB.loadLibrary(); + } + + public boolean hold() { + if (!this.loaded || this.db == null || this.closed) { + LOGGER.error("hold rocksdb Failed. {}", this.dbPath); + return false; + } else { + return true; + } + } + + public void release() { + } + + protected void put(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, + final byte[] keyBytes, final int keyLen, + final byte[] valueBytes, final int valueLen) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.put(cfHandle, writeOptions, keyBytes, 0, keyLen, valueBytes, 0, valueLen); + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("put Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void put(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, + final ByteBuffer keyBB, final ByteBuffer valueBB) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.put(cfHandle, writeOptions, keyBB, valueBB); + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("put Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void batchPut(WriteOptions writeOptions, final WriteBatch batch) throws RocksDBException { + try { + this.db.write(writeOptions, batch); + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("batchPut Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + batch.clear(); + } + } + + protected byte[] get(ColumnFamilyHandle cfHandle, ReadOptions readOptions, byte[] keyBytes) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + return this.db.get(cfHandle, readOptions, keyBytes); + } catch (RocksDBException e) { + LOGGER.error("get Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected boolean get(ColumnFamilyHandle cfHandle, ReadOptions readOptions, + final ByteBuffer keyBB, final ByteBuffer valueBB) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + return this.db.get(cfHandle, readOptions, keyBB, valueBB) != NOT_FOUND; + } catch (RocksDBException e) { + LOGGER.error("get Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected List multiGet(final ReadOptions readOptions, + final List columnFamilyHandleList, + final List keys) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + return this.db.multiGetAsList(readOptions, columnFamilyHandleList, keys); + } catch (RocksDBException e) { + LOGGER.error("multiGet Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void delete(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, byte[] keyBytes) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.delete(cfHandle, writeOptions, keyBytes); + } catch (RocksDBException e) { + LOGGER.error("delete Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void delete(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, ByteBuffer keyBB) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.delete(cfHandle, writeOptions, keyBB); + } catch (RocksDBException e) { + LOGGER.error("delete Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected WrappedRocksIterator newIterator(ColumnFamilyHandle cfHandle, ReadOptions readOptions) { + return new WrappedRocksIterator(this.db.newIterator(cfHandle, readOptions)); + } + + protected void rangeDelete(ColumnFamilyHandle cfHandle, WriteOptions writeOptions, + final byte[] startKey, final byte[] endKey) throws RocksDBException { + if (!hold()) { + throw new IllegalStateException("rocksDB:" + this + " is not ready"); + } + try { + this.db.deleteRange(cfHandle, writeOptions, startKey, endKey); + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("rangeDelete Failed. {}, {}", this.dbPath, getStatusError(e)); + throw e; + } finally { + release(); + } + } + + protected void manualCompactionDefaultCfMaxLevel(final CompactionOptions compactionOptions) throws Exception { + final ColumnFamilyHandle defaultCFHandle = this.defaultCFHandle; + final byte[] defaultCFName = defaultCFHandle.getName(); + List fileMetaDataList = this.db.getLiveFilesMetaData(); + if (fileMetaDataList == null || fileMetaDataList.isEmpty()) { + return; + } + + List defaultLiveFileDataList = Lists.newArrayList(); + List inputFileNames = Lists.newArrayList(); + int maxLevel = 0; + for (LiveFileMetaData fileMetaData : fileMetaDataList) { + if (compareTo(fileMetaData.columnFamilyName(), defaultCFName) != 0) { + continue; + } + defaultLiveFileDataList.add(fileMetaData); + if (fileMetaData.level() > maxLevel) { + maxLevel = fileMetaData.level(); + } + } + if (maxLevel == 0) { + LOGGER.info("manualCompactionDefaultCfFiles skip level 0."); + return; + } + + for (LiveFileMetaData fileMetaData : defaultLiveFileDataList) { + if (fileMetaData.level() != maxLevel || fileMetaData.beingCompacted()) { + continue; + } + inputFileNames.add(fileMetaData.path() + fileMetaData.fileName()); + } + if (!inputFileNames.isEmpty()) { + List outputLists = this.db.compactFiles(compactionOptions, defaultCFHandle, + inputFileNames, maxLevel, -1, null); + LOGGER.info("manualCompactionDefaultCfFiles OK. src: {}, dst: {}", inputFileNames, outputLists); + } else { + LOGGER.info("manualCompactionDefaultCfFiles Empty."); + } + } + + protected void manualCompactionDefaultCfRange(CompactRangeOptions compactRangeOptions) { + if (!hold()) { + return; + } + long s1 = System.currentTimeMillis(); + boolean result = true; + try { + LOGGER.info("manualCompaction Start. {}", this.dbPath); + this.db.compactRange(this.defaultCFHandle, null, null, compactRangeOptions); + } catch (RocksDBException e) { + result = false; + scheduleReloadRocksdb(e); + LOGGER.error("manualCompaction Failed. {}, {}", this.dbPath, getStatusError(e)); + } finally { + release(); + LOGGER.info("manualCompaction End. {}, rt: {}(ms), result: {}", this.dbPath, System.currentTimeMillis() - s1, result); + } + } + + protected void manualCompaction(long minPhyOffset, final CompactRangeOptions compactRangeOptions) { + this.manualCompactionThread.submit(new Runnable() { + @Override + public void run() { + manualCompactionDefaultCfRange(compactRangeOptions); + } + }); + } + + protected void open(final List cfDescriptors, + final List cfHandles) throws RocksDBException { + if (this.readOnly) { + this.db = RocksDB.openReadOnly(this.options, this.dbPath, cfDescriptors, cfHandles); + } else { + this.db = RocksDB.open(this.options, this.dbPath, cfDescriptors, cfHandles); + } + this.db.getEnv().setBackgroundThreads(8, Priority.HIGH); + this.db.getEnv().setBackgroundThreads(8, Priority.LOW); + + if (this.db == null) { + throw new RocksDBException("open rocksdb null"); + } + } + + protected abstract boolean postLoad(); + + public synchronized boolean start() { + if (this.loaded) { + return true; + } + if (postLoad()) { + this.loaded = true; + LOGGER.info("start OK. {}", this.dbPath); + this.closed = false; + return true; + } else { + return false; + } + } + + protected abstract void preShutdown(); + + public synchronized boolean shutdown() { + try { + if (!this.loaded) { + return true; + } + + final FlushOptions flushOptions = new FlushOptions(); + flushOptions.setWaitForFlush(true); + try { + flush(flushOptions); + } finally { + flushOptions.close(); + } + this.db.cancelAllBackgroundWork(true); + this.db.pauseBackgroundWork(); + //The close order is matter. + //1. close column family handles + preShutdown(); + + this.defaultCFHandle.close(); + //2. close column family options. + for (final ColumnFamilyOptions opt : this.cfOptions) { + opt.close(); + } + //3. close options + if (this.writeOptions != null) { + this.writeOptions.close(); + } + if (this.ableWalWriteOptions != null) { + this.ableWalWriteOptions.close(); + } + if (this.readOptions != null) { + this.readOptions.close(); + } + if (this.totalOrderReadOptions != null) { + this.totalOrderReadOptions.close(); + } + if (this.options != null) { + this.options.close(); + } + //4. close db. + if (db != null) { + this.db.syncWal(); + this.db.closeE(); + } + //5. help gc. + this.cfOptions.clear(); + this.db = null; + this.readOptions = null; + this.totalOrderReadOptions = null; + this.writeOptions = null; + this.ableWalWriteOptions = null; + this.options = null; + + this.loaded = false; + LOGGER.info("shutdown OK. {}", this.dbPath); + } catch (Exception e) { + LOGGER.error("shutdown Failed. {}", this.dbPath, e); + return false; + } + return true; + } + + public void flush(final FlushOptions flushOptions) { + if (!this.loaded || this.readOnly || closed) { + return; + } + + try { + if (db != null) { + this.db.flush(flushOptions); + } + } catch (RocksDBException e) { + scheduleReloadRocksdb(e); + LOGGER.error("flush Failed. {}, {}", this.dbPath, getStatusError(e)); + } + } + + public Statistics getStatistics() { + return this.options.statistics(); + } + + public ColumnFamilyHandle getDefaultCFHandle() { + return defaultCFHandle; + } + + public List getCompactionStatus() { + if (!hold()) { + return null; + } + try { + return this.db.getLiveFilesMetaData(); + } finally { + release(); + } + } + + private void scheduleReloadRocksdb(RocksDBException rocksDBException) { + if (rocksDBException == null || rocksDBException.getStatus() == null) { + return; + } + Status status = rocksDBException.getStatus(); + Status.Code code = status.getCode(); + // Status.Code.Incomplete == code + if (Status.Code.Aborted == code || Status.Code.Corruption == code || Status.Code.Undefined == code) { + LOGGER.error("scheduleReloadRocksdb. {}, {}", this.dbPath, getStatusError(rocksDBException)); + scheduleReloadRocksdb0(); + } + } + + private void scheduleReloadRocksdb0() { + if (!this.reloadPermit.tryAcquire()) { + return; + } + this.closed = true; + this.reloadScheduler.schedule(new Runnable() { + @Override + public void run() { + boolean result = true; + try { + reloadRocksdb(); + } catch (Exception e) { + result = false; + } finally { + reloadPermit.release(); + } + // try to reload rocksdb next time + if (!result) { + LOGGER.info("reload rocksdb Retry. {}", dbPath); + scheduleReloadRocksdb0(); + } + } + }, 10, TimeUnit.SECONDS); + } + + private void reloadRocksdb() throws Exception { + LOGGER.info("reload rocksdb Start. {}", this.dbPath); + if (!shutdown() || !start()) { + LOGGER.error("reload rocksdb Failed. {}", dbPath); + throw new Exception("reload rocksdb Error"); + } + LOGGER.info("reload rocksdb OK. {}", this.dbPath); + } + + public void flushWAL() throws RocksDBException { + this.db.flushWal(true); + } + + protected class WrappedRocksIterator { + private final RocksIterator iterator; + + public WrappedRocksIterator(final RocksIterator iterator) { + this.iterator = iterator; + } + + public byte[] key() { + return iterator.key(); + } + + public byte[] value() { + return iterator.value(); + } + + public void next() { + iterator.next(); + } + + public void prev() { + iterator.prev(); + } + + public void seek(byte[] target) { + iterator.seek(target); + } + + public void seekForPrev(byte[] target) { + iterator.seekForPrev(target); + } + + public void seekToFirst() { + iterator.seekToFirst(); + } + + public boolean isValid() { + return iterator.isValid(); + } + + public void close() { + iterator.close(); + } + } + + private String getStatusError(RocksDBException e) { + if (e == null || e.getStatus() == null) { + return "null"; + } + Status status = e.getStatus(); + StringBuilder sb = new StringBuilder(64); + sb.append("code: "); + if (status.getCode() != null) { + sb.append(status.getCode().name()); + } else { + sb.append("null"); + } + sb.append(", ").append("subCode: "); + if (status.getSubCode() != null) { + sb.append(status.getSubCode().name()); + } else { + sb.append("null"); + } + sb.append(", ").append("state: ").append(status.getState()); + return sb.toString(); + } + + public void statRocksdb(Logger logger) { + try { + + List liveFileMetaDataList = this.getCompactionStatus(); + if (liveFileMetaDataList == null || liveFileMetaDataList.isEmpty()) { + return; + } + Map map = Maps.newHashMap(); + for (LiveFileMetaData metaData : liveFileMetaDataList) { + StringBuilder sb = map.get(metaData.level()); + if (sb == null) { + sb = new StringBuilder(256); + map.put(metaData.level(), sb); + } + sb.append(new String(metaData.columnFamilyName(), CHARSET_UTF8)).append(SPACE). + append(metaData.fileName()).append(SPACE). + append("s: ").append(metaData.size()).append(SPACE). + append("a: ").append(metaData.numEntries()).append(SPACE). + append("r: ").append(metaData.numReadsSampled()).append(SPACE). + append("d: ").append(metaData.numDeletions()).append(SPACE). + append(metaData.beingCompacted()).append("\n"); + } + for (Map.Entry entry : map.entrySet()) { + logger.info("level: {}\n{}", entry.getKey(), entry.getValue().toString()); + } + + String blockCacheMemUsage = this.db.getProperty("rocksdb.block-cache-usage"); + String indexesAndFilterBlockMemUsage = this.db.getProperty("rocksdb.estimate-table-readers-mem"); + String memTableMemUsage = this.db.getProperty("rocksdb.cur-size-all-mem-tables"); + String blocksPinnedByIteratorMemUsage = this.db.getProperty("rocksdb.block-cache-pinned-usage"); + logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, memtable: {}, blocksPinnedByIterator: {}", + blockCacheMemUsage, indexesAndFilterBlockMemUsage, memTableMemUsage, blocksPinnedByIteratorMemUsage); + } catch (Exception ignored) { + } + } + + public int compareTo(byte[] v1, byte[] v2) { + int len1 = v1.length; + int len2 = v2.length; + int lim = Math.min(len1, len2); + + int k = 0; + while (k < lim) { + byte c1 = v1[k]; + byte c2 = v2[k]; + if (c1 != c2) { + return c1 - c2; + } + k++; + } + return len1 - len2; + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java new file mode 100644 index 00000000000..e810a637522 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.config; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.UtilAll; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactRangeOptions; +import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction; +import org.rocksdb.CompactionOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.CompressionType; +import org.rocksdb.DBOptions; +import org.rocksdb.DataBlockIndexType; +import org.rocksdb.IndexType; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.LRUCache; +import org.rocksdb.RateLimiter; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.SkipListMemTableConfig; +import org.rocksdb.Statistics; +import org.rocksdb.StatsLevel; +import org.rocksdb.StringAppendOperator; +import org.rocksdb.WALRecoveryMode; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.rocksdb.util.SizeUnit; + +public class ConfigRocksDBStorage extends AbstractRocksDBStorage { + + public ConfigRocksDBStorage(final String dbPath) { + super(); + this.dbPath = dbPath; + this.readOnly = false; + } + + private void initOptions() { + this.options = createConfigDBOptions(); + + this.writeOptions = new WriteOptions(); + this.writeOptions.setSync(false); + this.writeOptions.setDisableWAL(true); + this.writeOptions.setNoSlowdown(true); + + this.ableWalWriteOptions = new WriteOptions(); + this.ableWalWriteOptions.setSync(false); + this.ableWalWriteOptions.setDisableWAL(false); + this.ableWalWriteOptions.setNoSlowdown(true); + + this.readOptions = new ReadOptions(); + this.readOptions.setPrefixSameAsStart(true); + this.readOptions.setTotalOrderSeek(false); + this.readOptions.setTailing(false); + + this.totalOrderReadOptions = new ReadOptions(); + this.totalOrderReadOptions.setPrefixSameAsStart(false); + this.totalOrderReadOptions.setTotalOrderSeek(false); + this.totalOrderReadOptions.setTailing(false); + + this.compactRangeOptions = new CompactRangeOptions(); + this.compactRangeOptions.setBottommostLevelCompaction(BottommostLevelCompaction.kForce); + this.compactRangeOptions.setAllowWriteStall(true); + this.compactRangeOptions.setExclusiveManualCompaction(false); + this.compactRangeOptions.setChangeLevel(true); + this.compactRangeOptions.setTargetLevel(-1); + this.compactRangeOptions.setMaxSubcompactions(4); + + this.compactionOptions = new CompactionOptions(); + this.compactionOptions.setCompression(CompressionType.LZ4_COMPRESSION); + this.compactionOptions.setMaxSubcompactions(4); + this.compactionOptions.setOutputFileSizeLimit(4 * 1024 * 1024 * 1024L); + } + + @Override + protected boolean postLoad() { + try { + UtilAll.ensureDirOK(this.dbPath); + + initOptions(); + + final List cfDescriptors = new ArrayList(); + + ColumnFamilyOptions defaultOptions = createConfigOptions(); + this.cfOptions.add(defaultOptions); + cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); + + final List cfHandles = new ArrayList(); + open(cfDescriptors, cfHandles); + + this.defaultCFHandle = cfHandles.get(0); + } catch (final Exception e) { + AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", this.dbPath, e); + return false; + } + return true; + } + + @Override + protected void preShutdown() { + + } + + public ColumnFamilyOptions createConfigOptions() { + BlockBasedTableConfig tconfig = new BlockBasedTableConfig(). + setFormatVersion(5). + setIndexType(IndexType.kBinarySearch). + setDataBlockIndexType(DataBlockIndexType.kDataBlockBinarySearch). + setBlockSize(32 * SizeUnit.KB). + setFilterPolicy(new BloomFilter(16, false)). + // Indicating if we'd put index/filter blocks to the block cache. + setCacheIndexAndFilterBlocks(false). + setCacheIndexAndFilterBlocksWithHighPriority(true). + setPinL0FilterAndIndexBlocksInCache(false). + setPinTopLevelIndexAndFilter(true). + setBlockCache(new LRUCache(4 * SizeUnit.MB, 8, false)). + setWholeKeyFiltering(true); + + ColumnFamilyOptions options = new ColumnFamilyOptions(); + return options.setMaxWriteBufferNumber(2). + // MemTable size, memtable(cache) -> immutable memtable(cache) -> sst(disk) + setWriteBufferSize(8 * SizeUnit.MB). + setMinWriteBufferNumberToMerge(1). + setTableFormatConfig(tconfig). + setMemTableConfig(new SkipListMemTableConfig()). + setCompressionType(CompressionType.NO_COMPRESSION). + setNumLevels(7). + setCompactionStyle(CompactionStyle.LEVEL). + setLevel0FileNumCompactionTrigger(4). + setLevel0SlowdownWritesTrigger(8). + setLevel0StopWritesTrigger(12). + // The target file size for compaction. + setTargetFileSizeBase(64 * SizeUnit.MB). + setTargetFileSizeMultiplier(2). + // The upper-bound of the total size of L1 files in bytes + setMaxBytesForLevelBase(256 * SizeUnit.MB). + setMaxBytesForLevelMultiplier(2). + setMergeOperator(new StringAppendOperator()). + setInplaceUpdateSupport(true); + } + + public DBOptions createConfigDBOptions() { + //Turn based on https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide + // and http://gitlab.alibaba-inc.com/aloha/aloha/blob/branch_2_5_0/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbOptionsFactory.java + DBOptions options = new DBOptions(); + Statistics statistics = new Statistics(); + statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS); + return options. + setDbLogDir(getDBLogDir()). + setInfoLogLevel(InfoLogLevel.INFO_LEVEL). + setWalRecoveryMode(WALRecoveryMode.SkipAnyCorruptedRecords). + setManualWalFlush(true). + setMaxTotalWalSize(500 * SizeUnit.MB). + setWalSizeLimitMB(0). + setWalTtlSeconds(0). + setCreateIfMissing(true). + setCreateMissingColumnFamilies(true). + setMaxOpenFiles(-1). + setMaxLogFileSize(1 * SizeUnit.GB). + setKeepLogFileNum(5). + setMaxManifestFileSize(1 * SizeUnit.GB). + setAllowConcurrentMemtableWrite(false). + setStatistics(statistics). + setStatsDumpPeriodSec(600). + setAtomicFlush(true). + setMaxBackgroundJobs(32). + setMaxSubcompactions(4). + setParanoidChecks(true). + setDelayedWriteRate(16 * SizeUnit.MB). + setRateLimiter(new RateLimiter(100 * SizeUnit.MB)). + setUseDirectIoForFlushAndCompaction(true). + setUseDirectReads(true); + } + + public static String getDBLogDir() { + String rootPath = System.getProperty("user.home"); + if (StringUtils.isEmpty(rootPath)) { + return ""; + } + rootPath = rootPath + File.separator + "logs"; + UtilAll.ensureDirOK(rootPath); + return rootPath + File.separator + "rocketmqlogs" + File.separator; + } + + public void put(final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception { + put(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes, keyLen, valueBytes, valueBytes.length); + } + + public void put(final ByteBuffer keyBB, final ByteBuffer valueBB) throws Exception { + put(this.defaultCFHandle, this.ableWalWriteOptions, keyBB, valueBB); + } + + public byte[] get(final byte[] keyBytes) throws Exception { + return get(this.defaultCFHandle, this.totalOrderReadOptions, keyBytes); + } + + public void delete(final byte[] keyBytes) throws Exception { + delete(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes); + } + + public List multiGet(final List cfhList, final List keys) throws + RocksDBException { + return multiGet(this.totalOrderReadOptions, cfhList, keys); + } + + public void batchPut(final WriteBatch batch) throws RocksDBException { + batchPut(this.writeOptions, batch); + } + + public void batchPutWithWal(final WriteBatch batch) throws RocksDBException { + batchPut(this.ableWalWriteOptions, batch); + } + + public RocksIterator iterator() { + return this.db.newIterator(this.defaultCFHandle, this.totalOrderReadOptions); + } + + public void rangeDelete(final byte[] startKey, final byte[] endKey) throws RocksDBException { + rangeDelete(this.defaultCFHandle, this.writeOptions, startKey, endKey); + } + + public RocksIterator iterator(ReadOptions readOptions) { + return this.db.newIterator(this.defaultCFHandle, readOptions); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java new file mode 100644 index 00000000000..f958bbdf0ba --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.config; + +import java.util.function.BiConsumer; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.rocksdb.FlushOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; + +public class RocksDBConfigManager { + protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + + protected volatile boolean isStop = false; + protected ConfigRocksDBStorage configRocksDBStorage = null; + private FlushOptions flushOptions = null; + private volatile long lastFlushMemTableMicroSecond = 0; + private final long memTableFlushInterval; + + public RocksDBConfigManager(long memTableFlushInterval) { + this.memTableFlushInterval = memTableFlushInterval; + } + + public boolean load(String configFilePath, BiConsumer biConsumer) { + this.isStop = false; + this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath); + if (!this.configRocksDBStorage.start()) { + return false; + } + RocksIterator iterator = this.configRocksDBStorage.iterator(); + try { + iterator.seekToFirst(); + while (iterator.isValid()) { + biConsumer.accept(iterator.key(), iterator.value()); + iterator.next(); + } + } finally { + iterator.close(); + } + + this.flushOptions = new FlushOptions(); + this.flushOptions.setWaitForFlush(false); + this.flushOptions.setAllowWriteStall(false); + return true; + } + + public void start() { + } + + public boolean stop() { + this.isStop = true; + if (this.configRocksDBStorage != null) { + return this.configRocksDBStorage.shutdown(); + } + if (this.flushOptions != null) { + this.flushOptions.close(); + } + return true; + } + + public void flushWAL() { + try { + if (this.isStop) { + return; + } + if (this.configRocksDBStorage != null) { + this.configRocksDBStorage.flushWAL(); + + long now = System.currentTimeMillis(); + if (now > this.lastFlushMemTableMicroSecond + this.memTableFlushInterval) { + this.configRocksDBStorage.flush(this.flushOptions); + this.lastFlushMemTableMicroSecond = now; + } + } + } catch (Exception e) { + BROKER_LOG.error("kv flush WAL Failed.", e); + } + } + + public void put(final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception { + this.configRocksDBStorage.put(keyBytes, keyLen, valueBytes); + } + + public void delete(final byte[] keyBytes) throws Exception { + this.configRocksDBStorage.delete(keyBytes); + } + + public void batchPutWithWal(final WriteBatch batch) throws Exception { + this.configRocksDBStorage.batchPutWithWal(batch); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java index c1176ea1534..cb04b00b3ec 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java @@ -51,4 +51,5 @@ public class LoggerName { public static final String PROXY_LOGGER_NAME = "RocketmqProxy"; public static final String PROXY_WATER_MARK_LOGGER_NAME = "RocketmqProxyWatermark"; public static final String ROCKETMQ_COLDCTR_LOGGER_NAME = "RocketmqColdCtr"; + public static final String ROCKSDB_LOGGER_NAME = "RocketmqRocksDB"; } diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index b104016fb56..7ab8109312c 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -50,7 +50,7 @@ public static void main(String[] args) throws MQClientException { * */ // Uncomment the following line while debugging, namesrvAddr should be set to your local address -// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); + consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); /* * Specify where to start in case the specific consumer group is a brand-new one. diff --git a/pom.xml b/pom.xml index 4d5dd1deca0..3a08d75f2ba 100644 --- a/pom.xml +++ b/pom.xml @@ -137,6 +137,7 @@ 1.26.0-alpha 2.0.6 2.20.29 + 1.0.3 2.13.4.2 @@ -711,6 +712,11 @@ slf4j-api ${slf4j-api.version} + + io.github.aliyunmq + rocketmq-rocksdb + ${rocksdb.version} + io.github.aliyunmq rocketmq-shaded-slf4j-api-bridge diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreType.java b/store/src/main/java/org/apache/rocketmq/store/StoreType.java new file mode 100644 index 00000000000..4f9c4d0e448 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/StoreType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +public enum StoreType { + DEFAULT("default"), + DEFAULT_ROCKSDB("defaultRocksDB"); + + private String storeType; + + StoreType(String storeType) { + this.storeType = storeType; + } + + public String getStoreType() { + return storeType; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 4f204d7425c..efb728ac04a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -20,6 +20,7 @@ import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.StoreType; import org.apache.rocketmq.store.queue.BatchConsumeQueue; public class MessageStoreConfig { @@ -102,6 +103,9 @@ public class MessageStoreConfig { private int timerMetricSmallThreshold = 1000000; private int timerProgressLogIntervalMs = 10 * 1000; + // default, defaultRocksDB + @ImportantField + private String storeType = StoreType.DEFAULT.getStoreType(); // ConsumeQueue file size,default is 30W private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; // enable consume queue ext @@ -392,6 +396,11 @@ public class MessageStoreConfig { private int batchDispatchRequestThreadPoolNums = 16; + // rocksdb mode + private boolean realTimePersistRocksDBConfig = true; + private long memTableFlushInterval = 60 * 60 * 1000L; + private boolean enableRocksDBLog = false; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -488,6 +497,14 @@ public void setMappedFileSizeCommitLog(int mappedFileSizeCommitLog) { this.mappedFileSizeCommitLog = mappedFileSizeCommitLog; } + public String getStoreType() { + return storeType; + } + + public void setStoreType(String storeType) { + this.storeType = storeType; + } + public int getMappedFileSizeConsumeQueue() { int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0)); @@ -1710,4 +1727,28 @@ public int getBatchDispatchRequestThreadPoolNums() { public void setBatchDispatchRequestThreadPoolNums(int batchDispatchRequestThreadPoolNums) { this.batchDispatchRequestThreadPoolNums = batchDispatchRequestThreadPoolNums; } + + public boolean isRealTimePersistRocksDBConfig() { + return realTimePersistRocksDBConfig; + } + + public void setRealTimePersistRocksDBConfig(boolean realTimePersistRocksDBConfig) { + this.realTimePersistRocksDBConfig = realTimePersistRocksDBConfig; + } + + public long getMemTableFlushInterval() { + return memTableFlushInterval; + } + + public void setMemTableFlushInterval(long memTableFlushInterval) { + this.memTableFlushInterval = memTableFlushInterval; + } + + public boolean isEnableRocksDBLog() { + return enableRocksDBLog; + } + + public void setEnableRocksDBLog(boolean enableRocksDBLog) { + this.enableRocksDBLog = enableRocksDBLog; + } }