Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt at #SERVER-86 -- ConcurrentModificationException #64

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions src/main/java/org/graylog2/database/HostCounterCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

package org.graylog2.database;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* HostCounterCache.java: Feb 21, 2010 4:57:13 PM
Expand All @@ -35,7 +36,7 @@
public class HostCounterCache {
private static HostCounterCache instance;

private Map<String, Integer> cache = new HashMap<String, Integer>();
private ConcurrentMap<String, AtomicInteger> cache = new ConcurrentHashMap<String, AtomicInteger>();

private HostCounterCache() { }

Expand All @@ -56,34 +57,46 @@ public static synchronized HostCounterCache getInstance() {
* @param hostname The host of which the counter to increment.
*/
public void increment(String hostname) {
int old = 0;
//http://stackoverflow.com/questions/2539654/java-concurrency-many-writers-one-reader/2539761#2539761

if (this.cache.containsKey(hostname)) {
old = this.cache.get(hostname);
AtomicInteger counter = cache.get(hostname);
if (counter == null) {
counter = cache.putIfAbsent(hostname, new AtomicInteger(1));
}

this.cache.put(hostname, old+1);
}

/**
* Remove host from counter.
*
* @param hostname The host of which the counter to reset.
*/
public void reset(String hostname) {
if (this.cache.containsKey(hostname)) {
this.cache.remove(hostname);
if (counter != null) {
counter.incrementAndGet();
}
}

/**
* Get the current count of host.
* Get the current count of host and remove host from counter.
*
* @param hostname The host of which the count to get.
* @param hostname The host of which the count to get and counter to reset.
* @return
*/
public int getCount(String hostname) {
return this.cache.get(hostname) == null ? 0 : this.cache.get(hostname);
public int getCountAndReset(String hostname) {
//http://www.javamex.com/tutorials/synchronization_concurrency_8_hashmap2.shtml
//(section: "Truly atomic updates")

int count = 0;

while (true) {
AtomicInteger counter = cache.get(hostname);

if (counter == null) {
break; //_counter_ not found or removed. Exit loop with _count_ unmodified.
}

if (!cache.remove(hostname, counter)) {
continue; //Another thread removed _counter_. Retry loop.
}

count = counter.get();
break; //Successfully removed _counter_. Exit loop with new _count_ value.
}

return count;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public void run() {
try {
MongoBridge m = new MongoBridge();
for (String host : HostCounterCache.getInstance().getAllHosts()) {
m.upsertHostCount(host, HostCounterCache.getInstance().getCount(host));
HostCounterCache.getInstance().reset(host);
m.upsertHostCount(host, HostCounterCache.getInstance().getCountAndReset(host));
}
} catch (Exception e) {
LOG.warn("Error in HostCounterCacheWriterThread: " + e.getMessage(), e);
Expand Down
133 changes: 133 additions & 0 deletions src/test/java/org/graylog2/database/HostCounterCacheTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package org.graylog2.database;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.junit.Assert;
import org.junit.Test;

/**
* Unit tests for {@link HostCounterCache} class
*
* @author Andrew NS Yeow <ngeesoon80@yahoo.com>
*/
public class HostCounterCacheTest {
@Test
public void testTwoIncrementCallable() throws InterruptedException, ExecutionException, Exception {
//http://stackoverflow.com/a/3768661

ExecutorService executor = Executors.newCachedThreadPool();

final int HOST_COUNT = 10;
final int LOOP_COUNT = 1000;
Callable<Map<String,Integer>> getCount1 = new GetAllHostsAndGetCountAndResetCallable();
Callable<Map<String,Integer>> getCount2 = new GetAllHostsAndGetCountAndResetCallable();
Callable<int[][]> increment1 = new IncrementCallable(HOST_COUNT, LOOP_COUNT, executor, getCount1);
Callable<int[][]> increment2 = new IncrementCallable(HOST_COUNT, LOOP_COUNT, executor, getCount2);

Future<int[][]> incrementSubmit1 = executor.submit(increment1);
Future<int[][]> incrementSubmit2 = executor.submit(increment2);

int[][] generatedAndCollectedCounts1 = incrementSubmit1.get();
int[][] generatedAndCollectedCounts2 = incrementSubmit2.get();

int[] generatedCounts1 = generatedAndCollectedCounts1[0];
int[] generatedCounts2 = generatedAndCollectedCounts2[0];
int[] partailCollectedCounts1 = generatedAndCollectedCounts1[1];
int[] partailCollectedCounts2 = generatedAndCollectedCounts2[1];
int[] cumulativeGeneratedCounts = new int[HOST_COUNT];
int[] cumulativeCollectedCounts = new int[HOST_COUNT];

for (int i = 0; i < HOST_COUNT; i++) {
cumulativeGeneratedCounts[i] += generatedCounts1[i];
cumulativeGeneratedCounts[i] += generatedCounts2[i];
cumulativeCollectedCounts[i] += partailCollectedCounts1[i];
cumulativeCollectedCounts[i] += partailCollectedCounts2[i];
}

Callable<Map<String,Integer>> getCountFinal = new GetAllHostsAndGetCountAndResetCallable();
Map<String, Integer> collectedCountsRemainder = getCountFinal.call();
for (int i = 0; i < HOST_COUNT; i++) {
String HOST = "host" + i;
Integer partialCollectedCountRemainder = collectedCountsRemainder.get(HOST);
if (partialCollectedCountRemainder != null) {
cumulativeCollectedCounts[i] += partialCollectedCountRemainder.intValue();
}
}

Assert.assertArrayEquals("cumulativeCounts", cumulativeGeneratedCounts, cumulativeCollectedCounts);
}

class IncrementCallable implements Callable<int[][]> {
final int HOST_COUNT;
final int LOOP_COUNT;
final ExecutorService executor;
final Callable<Map<String,Integer>> getCountCallable;

IncrementCallable(int HOST_COUNT, int LOOP_COUNT, ExecutorService executor, Callable<Map<String,Integer>> getCountCallable) {
this.HOST_COUNT = HOST_COUNT;
this.LOOP_COUNT = LOOP_COUNT;
this.executor = executor;
this.getCountCallable = getCountCallable;
}

@Override
public int[][] call() throws InterruptedException, ExecutionException {
final Random random = new Random();
final int[] generatedCounts = new int[HOST_COUNT];

final String[] HOSTS = new String[HOST_COUNT];
for (int i = 0; i < HOST_COUNT; i++) {
HOSTS[i] = "host" + i;
}

Future<Map<String,Integer>> getCountSubmit = null;
for (int i = 0; i < LOOP_COUNT; i++) {
if (i == (LOOP_COUNT / 2)) {
getCountSubmit = executor.submit(getCountCallable);
}

int randomHost = random.nextInt(HOST_COUNT);
generatedCounts[randomHost]++;
HostCounterCache.getInstance().increment(HOSTS[randomHost]);
}

Map<String,Integer> partialCollectedCounts = getCountSubmit.get();
int[] intPartialCollectedCounts = new int[HOST_COUNT];
for (int i = 0; i < HOST_COUNT; i++) {
Integer partialCollectedCount = partialCollectedCounts.get(HOSTS[i]);
if (partialCollectedCount != null) {
intPartialCollectedCounts[i] += partialCollectedCount.intValue();
}
}

return new int[][] { generatedCounts, intPartialCollectedCounts };
}
}

class GetAllHostsAndGetCountAndResetCallable implements Callable<Map<String,Integer>> {
@Override
public Map<String,Integer> call() {
final Map<String,Integer> collectedCounts = new HashMap<String,Integer>();

for (String host : HostCounterCache.getInstance().getAllHosts()) {
int newCount = HostCounterCache.getInstance().getCountAndReset(host);

if (!collectedCounts.containsKey(host)) {
collectedCounts.put(host, newCount);
} else {
int oldCount = collectedCounts.get(host);
collectedCounts.put(host, oldCount + newCount);
}
}

return collectedCounts;
}
}
}