Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make KeyedLock reentrant #27920

Merged
merged 1 commit into from
Dec 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,34 @@
* created the first time they are acquired and removed if no thread hold the
* lock. The latter is important to assure that the list of locks does not grow
* infinitely.
*
* Note: this lock is reentrant
*
* */
public class KeyedLock<T> {
public final class KeyedLock<T> {

private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private final boolean fair;

/**
* Creates a new lock
* @param fair Use fair locking, ie threads get the lock in the order they requested it
*/
public KeyedLock(boolean fair) {
this.fair = fair;
}

/**
* Creates a non-fair lock
*/
public KeyedLock() {
this(false);
}

private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

/**
* Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired
* by the same thread multiple times. The lock is released by closing the returned {@link Releasable}.
*/
public Releasable acquire(T key) {
assert isHeldByCurrentThread(key) == false : "lock for " + key + " is already heald by this thread";
while (true) {
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
Expand All @@ -73,6 +79,9 @@ public Releasable acquire(T key) {
}
}

/**
* Returns <code>true</code> iff the caller thread holds the lock for the given key
*/
public boolean isHeldByCurrentThread(T key) {
KeyLock lock = map.get(key);
if (lock == null) {
Expand All @@ -81,7 +90,7 @@ public boolean isHeldByCurrentThread(T key) {
return lock.isHeldByCurrentThread();
}

void release(T key, KeyLock lock) {
private void release(T key, KeyLock lock) {
assert lock == map.get(key);
lock.unlock();
int decrementAndGet = lock.count.decrementAndGet();
Expand Down Expand Up @@ -118,8 +127,11 @@ private static final class KeyLock extends ReentrantLock {
private final AtomicInteger count = new AtomicInteger(1);
}

/**
* Returns <code>true</code> if this lock has at least one locked key.
*/
public boolean hasLockedKeys() {
return !map.isEmpty();
return map.isEmpty() == false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -67,6 +70,45 @@ public void testIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedExce
}
}

public void testHasLockedKeys() {
KeyedLock<String> lock = new KeyedLock<>();
assertFalse(lock.hasLockedKeys());
Releasable foo = lock.acquire("foo");
assertTrue(lock.hasLockedKeys());
foo.close();
assertFalse(lock.hasLockedKeys());
}

public void testLockIsReentrant() throws InterruptedException {
KeyedLock<String> lock = new KeyedLock<>();
Releasable foo = lock.acquire("foo");
assertTrue(lock.isHeldByCurrentThread("foo"));
assertFalse(lock.isHeldByCurrentThread("bar"));
Releasable foo2 = lock.acquire("foo");
AtomicInteger test = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread(() -> {
latch.countDown();
try (Releasable r = lock.acquire("foo")) {
test.incrementAndGet();
}

});
t.start();
latch.await();
Thread.yield();
assertEquals(0, test.get());
List<Releasable> list = Arrays.asList(foo, foo2);
Collections.shuffle(list, random());
list.get(0).close();
Thread.yield();
assertEquals(0, test.get());
list.get(1).close();
t.join();
assertEquals(1, test.get());
assertFalse(lock.hasLockedKeys());
}


public static class AcquireAndReleaseThread extends Thread {
private CountDownLatch startLatch;
Expand Down Expand Up @@ -98,6 +140,12 @@ public void run() {
try (Releasable ignored = connectionLock.acquire(curName)) {
assert connectionLock.isHeldByCurrentThread(curName);
assert connectionLock.isHeldByCurrentThread(curName + "bla") == false;
if (randomBoolean()) {
try (Releasable reentrantIgnored = connectionLock.acquire(curName)) {
// just acquire this and make sure we can :)
Thread.yield();
}
}
Integer integer = counter.get(curName);
if (integer == null) {
counter.put(curName, 1);
Expand Down