Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-70-3]Extract adaptive lock mechanism #8663

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.lock.AdaptiveLock;
import org.apache.rocketmq.store.lock.AdaptiveLockImpl;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;
Expand Down Expand Up @@ -92,7 +94,7 @@ public class CommitLog implements Swappable {

private volatile long beginTimeInLock = 0;

protected final PutMessageLock putMessageLock;
protected final AdaptiveLock putMessageLock;

protected final TopicQueueLock topicQueueLock;

Expand Down Expand Up @@ -129,7 +131,7 @@ protected PutMessageThreadLocal initialValue() {
return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
}
};
this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
this.putMessageLock = new AdaptiveLockImpl(messageStore.getMessageStoreConfig().getTpsSwapCriticalPoint());

this.flushDiskWatcher = new FlushDiskWatcher();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
package org.apache.rocketmq.store;

import org.apache.rocketmq.store.lock.AdaptiveLock;

import java.util.concurrent.locks.ReentrantLock;

/**
* Exclusive lock implementation to put message
*/
public class PutMessageReentrantLock implements PutMessageLock {
public class PutMessageReentrantLock implements PutMessageLock, AdaptiveLock {
private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,17 @@ public class MessageStoreConfig {

private boolean putConsumeQueueDataByFileChannel = true;

/**
* Spin number in the retreat strategy of spin lock
* Default is 1000
*/
private int spinLockCollisionRetreatOptimalDegree = 1000;

/*
*Critical TPS that triggers the adaptive locking mechanism switch
*/
private int tpsSwapCriticalPoint = 50000;

public boolean isEnabledAppendPropCRC() {
return enabledAppendPropCRC;
}
Expand Down Expand Up @@ -1854,4 +1865,19 @@ public void setTransferMetadataJsonToRocksdb(boolean transferMetadataJsonToRocks
this.transferMetadataJsonToRocksdb = transferMetadataJsonToRocksdb;
}

public int getSpinLockCollisionRetreatOptimalDegree() {
return spinLockCollisionRetreatOptimalDegree;
}

public void setSpinLockCollisionRetreatOptimalDegree(int optimalDegree) {
this.spinLockCollisionRetreatOptimalDegree = optimalDegree;
}

public int getTpsSwapCriticalPoint() {
return tpsSwapCriticalPoint;
}

public void setTpsSwapCriticalPoint(int tpsSwapCriticalPoint) {
this.tpsSwapCriticalPoint = tpsSwapCriticalPoint;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.lock;

import org.apache.rocketmq.store.config.MessageStoreConfig;

public interface AdaptiveLock {

void lock();

void unlock();

default void update(MessageStoreConfig messageStoreConfig) {
}

default void swap() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.lock;

import org.apache.rocketmq.store.PutMessageReentrantLock;
import org.apache.rocketmq.store.config.MessageStoreConfig;

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class AdaptiveLockImpl implements AdaptiveLock {
private AdaptiveLock adaptiveLock;

//state
private AtomicBoolean state = new AtomicBoolean(true);

private Map<String, AdaptiveLock> locks;

private final List<AtomicInteger> tpsTable;

private int tpsSwapCriticalPoint;

public AdaptiveLockImpl(int tpsSwapCriticalPoint) {
this.locks = new HashMap<>();
this.locks.put("Reentrant", new PutMessageReentrantLock());
this.locks.put("Collision", new CollisionRetreatLock());

this.tpsTable = new ArrayList<>(2);
this.tpsTable.add(new AtomicInteger(0));
this.tpsTable.add(new AtomicInteger(0));

this.tpsSwapCriticalPoint = tpsSwapCriticalPoint;
adaptiveLock = this.locks.get("Collision");
}

@Override
public void lock() {
tpsTable.get(LocalTime.now().getSecond() % 2).getAndIncrement();
boolean state;
do {
state = this.state.get();
} while (!state);

this.adaptiveLock.lock();
}

@Override
public void unlock() {
boolean state;
do {
state = this.state.get();
} while (!state);

this.adaptiveLock.unlock();
swap();
}

@Override
public void update(MessageStoreConfig messageStoreConfig) {
this.adaptiveLock.update(messageStoreConfig);
}

@Override
public void swap() {
boolean needSwap = false;
int slot = LocalTime.now().getSecond() % 2 - 1 >= 0 ? 0 : 1;
int tps = this.tpsTable.get(slot).get();
this.tpsTable.get(slot).set(-1);
if (tps == -1) {
return;
}
if (tps > this.tpsSwapCriticalPoint) {
if (this.adaptiveLock instanceof CollisionRetreatLock) {
needSwap = true;
}
} else {
if (this.adaptiveLock instanceof PutMessageReentrantLock) {
needSwap = true;
}
}

if (needSwap) {
if (this.state.compareAndSet(true, false)) {
this.adaptiveLock.lock();
if (this.adaptiveLock instanceof CollisionRetreatLock) {
this.adaptiveLock = this.locks.get("Reentrant");
} else {
this.adaptiveLock = this.locks.get("Collision");
}
try {
this.adaptiveLock.unlock();
} catch (Exception ignore) {
//Used to synchronize the lock state,
//ReentrantLock throws an exception when unlock is executed when the lock is free, so it is caught and ignored
}
this.state.compareAndSet(false, true);
}
}
}

public List<AdaptiveLock> getLocks() {
return (List<AdaptiveLock>) this.locks.values();
}

public void setLocks(Map<String, AdaptiveLock> locks) {
this.locks = locks;
}

public boolean getState() {
return this.state.get();
}

public void setState(boolean state) {
this.state.set(state);
}

public AdaptiveLock getAdaptiveLock() {
return adaptiveLock;
}

public List<AtomicInteger> getTpsTable() {
return tpsTable;
}

public void setTpsSwapCriticalPoint(int tpsSwapCriticalPoint) {
this.tpsSwapCriticalPoint = tpsSwapCriticalPoint;
}

public int getTpsSwapCriticalPoint() {
return tpsSwapCriticalPoint;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.lock;

import org.apache.rocketmq.store.config.MessageStoreConfig;

import java.util.concurrent.atomic.AtomicBoolean;

public class CollisionRetreatLock implements AdaptiveLock {

private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

private int optimalDegree;

public CollisionRetreatLock() {
this.optimalDegree = 1000;
}

public CollisionRetreatLock(int optimalDegree) {
this.optimalDegree = optimalDegree;
}

@Override
public void lock() {
int spinDegree = this.optimalDegree;
while (true) {
for (int i = 0; i < spinDegree; i++) {
if (this.putMessageSpinLock.compareAndSet(true, false)) {
return;
}
}
Thread.yield();
}
}

@Override
public void unlock() {
this.putMessageSpinLock.compareAndSet(false, true);
}

@Override
public void update(MessageStoreConfig messageStoreConfig) {
this.optimalDegree = messageStoreConfig.getSpinLockCollisionRetreatOptimalDegree();
}

public int getOptimalDegree() {
return this.optimalDegree;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.lock;

import org.apache.rocketmq.store.PutMessageReentrantLock;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertTrue;

public class AdaptiveLockTest {

AdaptiveLockImpl adaptiveLock;

@Before
public void init() {
adaptiveLock = new AdaptiveLockImpl(50000);
}

@Test
public void testAdaptiveLock() throws InterruptedException {
assertTrue(adaptiveLock.getAdaptiveLock() instanceof CollisionRetreatLock);

for (int i = 0; i < 100000; i++) {
adaptiveLock.lock();
adaptiveLock.unlock();
if (i == 70000) Thread.sleep(1000);
}
assertTrue(adaptiveLock.getAdaptiveLock() instanceof PutMessageReentrantLock);

Thread.sleep(1000L);
adaptiveLock.lock();
adaptiveLock.unlock();
for (int i = 0; i < 300; i++) {
adaptiveLock.lock();
adaptiveLock.unlock();
Thread.sleep(10);
}
assertTrue(adaptiveLock.getAdaptiveLock() instanceof CollisionRetreatLock);

for (int i = 0; i < 100000; i++) {
adaptiveLock.lock();
adaptiveLock.unlock();
if (i == 70000) Thread.sleep(1000);
}
assertTrue(adaptiveLock.getAdaptiveLock() instanceof PutMessageReentrantLock);
}
}
Loading