Skip to content

Commit

Permalink
Limit user to single concurrent auth per realm (#30794)
Browse files Browse the repository at this point in the history
This commit reworks the way our realms perform caching in order to
limit each principal to a single ongoing authentication per realm. In
other words, this means that multiple requests made by the same user
will not trigger more that one authentication attempt at a time if no
entry has been stored in the cache. If an entry is present in our
cache, there is no restriction on the number of concurrent
authentications performed for this user.

This change enables us to limit the load we place on an external system
like an LDAP server and also preserve resources such as CPU on
expensive operations such as BCrypt authentication.

Closes #30355
  • Loading branch information
jaymode authored May 24, 2018
1 parent 9cb6b90 commit b3a4acd
Show file tree
Hide file tree
Showing 18 changed files with 713 additions and 228 deletions.
79 changes: 64 additions & 15 deletions server/src/main/java/org/elasticsearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* @param <V> The type of the values
*/
public class Cache<K, V> {

// positive if entries have an expiration
private long expireAfterAccessNanos = -1;

Expand Down Expand Up @@ -282,6 +283,39 @@ void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
}
}

/**
* remove an entry from the segment iff the future is done and the value is equal to the
* expected value
*
* @param key the key of the entry to remove from the cache
* @param value the value expected to be associated with the key
* @param onRemoval a callback for the removed entry
*/
void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
CompletableFuture<Entry<K, V>> future;
boolean removed = false;
try (ReleasableLock ignored = writeLock.acquire()) {
future = map.get(key);
try {
if (future != null) {
if (future.isDone()) {
Entry<K, V> entry = future.get();
if (Objects.equals(value, entry.value)) {
removed = map.remove(key, future);
}
}
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
}

if (future != null && removed) {
segmentStats.eviction();
onRemoval.accept(future);
}
}

private static class SegmentStats {
private final LongAdder hits = new LongAdder();
private final LongAdder misses = new LongAdder();
Expand Down Expand Up @@ -314,7 +348,7 @@ void eviction() {
Entry<K, V> tail;

// lock protecting mutations to the LRU list
private ReleasableLock lruLock = new ReleasableLock(new ReentrantLock());
private final ReleasableLock lruLock = new ReleasableLock(new ReentrantLock());

/**
* Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key.
Expand Down Expand Up @@ -455,6 +489,19 @@ private void put(K key, V value, long now) {
}
}

private final Consumer<CompletableFuture<Entry<K, V>>> invalidationConsumer = f -> {
try {
Entry<K, V> entry = f.get();
try (ReleasableLock ignored = lruLock.acquire()) {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
}
} catch (ExecutionException e) {
// ok
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
};

/**
* Invalidate the association for the specified key. A removal notification will be issued for invalidated
* entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
Expand All @@ -463,18 +510,20 @@ private void put(K key, V value, long now) {
*/
public void invalidate(K key) {
CacheSegment<K, V> segment = getCacheSegment(key);
segment.remove(key, f -> {
try {
Entry<K, V> entry = f.get();
try (ReleasableLock ignored = lruLock.acquire()) {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
}
} catch (ExecutionException e) {
// ok
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
});
segment.remove(key, invalidationConsumer);
}

/**
* Invalidate the entry for the specified key and value. If the value provided is not equal to the value in
* the cache, no removal will occur. A removal notification will be issued for invalidated
* entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
*
* @param key the key whose mapping is to be invalidated from the cache
* @param value the expected value that should be associated with the key
*/
public void invalidate(K key, V value) {
CacheSegment<K, V> segment = getCacheSegment(key);
segment.remove(key, value, invalidationConsumer);
}

/**
Expand Down Expand Up @@ -625,7 +674,7 @@ public void remove() {
Entry<K, V> entry = current;
if (entry != null) {
CacheSegment<K, V> segment = getCacheSegment(entry.key);
segment.remove(entry.key, f -> {});
segment.remove(entry.key, entry.value, f -> {});
try (ReleasableLock ignored = lruLock.acquire()) {
current = null;
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
Expand Down Expand Up @@ -710,7 +759,7 @@ private void evictEntry(Entry<K, V> entry) {

CacheSegment<K, V> segment = getCacheSegment(entry.key);
if (segment != null) {
segment.remove(entry.key, f -> {});
segment.remove(entry.key, entry.value, f -> {});
}
delete(entry, RemovalNotification.RemovalReason.EVICTED);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Tuple;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* A future implementation that allows for the result to be passed to listeners waiting for
* notification. This is useful for cases where a computation is requested many times
* concurrently, but really only needs to be performed a single time. Once the computation
* has been performed the registered listeners will be notified by submitting a runnable
* for execution in the provided {@link ExecutorService}. If the computation has already
* been performed, a request to add a listener will simply result in execution of the listener
* on the calling thread.
*/
public final class ListenableFuture<V> extends BaseFuture<V> implements ActionListener<V> {

private volatile boolean done = false;
private final List<Tuple<ActionListener<V>, ExecutorService>> listeners = new ArrayList<>();

/**
* Adds a listener to this future. If the future has not yet completed, the listener will be
* notified of a response or exception in a runnable submitted to the ExecutorService provided.
* If the future has completed, the listener will be notified immediately without forking to
* a different thread.
*/
public void addListener(ActionListener<V> listener, ExecutorService executor) {
if (done) {
// run the callback directly, we don't hold the lock and don't need to fork!
notifyListener(listener, EsExecutors.newDirectExecutorService());
} else {
final boolean run;
// check done under lock since it could have been modified and protect modifications
// to the list under lock
synchronized (this) {
if (done) {
run = true;
} else {
listeners.add(new Tuple<>(listener, executor));
run = false;
}
}

if (run) {
// run the callback directly, we don't hold the lock and don't need to fork!
notifyListener(listener, EsExecutors.newDirectExecutorService());
}
}
}

@Override
protected synchronized void done() {
done = true;
listeners.forEach(t -> notifyListener(t.v1(), t.v2()));
// release references to any listeners as we no longer need them and will live
// much longer than the listeners in most cases
listeners.clear();
}

private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
try {
executorService.submit(() -> {
try {
// call get in a non-blocking fashion as we could be on a network thread
// or another thread like the scheduler, which we should never block!
V value = FutureUtils.get(this, 0L, TimeUnit.NANOSECONDS);
listener.onResponse(value);
} catch (Exception e) {
listener.onFailure(e);
}
});
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public void onResponse(V v) {
final boolean set = set(v);
if (set == false) {
throw new IllegalStateException("did not set value, value or exception already set?");
}
}

@Override
public void onFailure(Exception e) {
final boolean set = setException(e);
if (set == false) {
throw new IllegalStateException("did not set exception, value already set or exception already set?");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,62 @@ public void testNotificationOnInvalidate() {
assertEquals(notifications, invalidated);
}

// randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null
public void testInvalidateWithValue() {
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
Set<Integer> keys = new HashSet<>();
for (Integer key : cache.keys()) {
if (rarely()) {
if (randomBoolean()) {
cache.invalidate(key, key.toString());
keys.add(key);
} else {
// invalidate with incorrect value
cache.invalidate(key, Integer.toString(key * randomIntBetween(2, 10)));
}
}
}
for (int i = 0; i < numberOfEntries; i++) {
if (keys.contains(i)) {
assertNull(cache.get(i));
} else {
assertNotNull(cache.get(i));
}
}
}

// randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only
// those entries
public void testNotificationOnInvalidateWithValue() {
Set<Integer> notifications = new HashSet<>();
Cache<Integer, String> cache =
CacheBuilder.<Integer, String>builder()
.removalListener(notification -> {
assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason());
notifications.add(notification.getKey());
})
.build();
for (int i = 0; i < numberOfEntries; i++) {
cache.put(i, Integer.toString(i));
}
Set<Integer> invalidated = new HashSet<>();
for (int i = 0; i < numberOfEntries; i++) {
if (rarely()) {
if (randomBoolean()) {
cache.invalidate(i, Integer.toString(i));
invalidated.add(i);
} else {
// invalidate with incorrect value
cache.invalidate(i, Integer.toString(i * randomIntBetween(2, 10)));
}
}
}
assertEquals(notifications, invalidated);
}

// invalidate all cached entries, then check that the cache is empty
public void testInvalidateAll() {
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
Expand Down
Loading

0 comments on commit b3a4acd

Please sign in to comment.