Skip to content

Commit

Permalink
Add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Aug 15, 2018
1 parent aad650a commit 07dab4f
Showing 1 changed file with 7 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,12 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger);
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get());
// we are going to synchronize the actions of three threads: the updating thread, the listener thread, and the main test thread
final CyclicBarrier barrier = new CyclicBarrier(3);
final int numberOfIterations = randomIntBetween(1, 1024);
final AtomicBoolean closed = new AtomicBoolean();
final Thread updatingThread = new Thread(() -> {
// synchronize starting with the listener thread and the main test thread
awaitQuietly(barrier);
for (int i = 0; i < numberOfIterations; i++) {
if (rarely() && closed.get() == false) {
Expand All @@ -367,11 +369,13 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio
globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet());
}
}
// synchronize ending with the listener thread and the main test thread
awaitQuietly(barrier);
});

final List<AtomicBoolean> invocations = new CopyOnWriteArrayList<>();
final Thread listenersThread = new Thread(() -> {
// synchronize starting with the updating thread and the main test thread
awaitQuietly(barrier);
for (int i = 0; i < numberOfIterations; i++) {
final AtomicBoolean invocation = new AtomicBoolean();
Expand All @@ -385,11 +389,14 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio
}
});
}
// synchronize ending with the updating thread and the main test thread
awaitQuietly(barrier);
});
updatingThread.start();
listenersThread.start();
// synchronize starting with the updating thread and the listener thread
barrier.await();
// synchronize ending with the updating thread and the listener thread
barrier.await();
// one last update to ensure all listeners are notified
if (closed.get() == false) {
Expand Down

0 comments on commit 07dab4f

Please sign in to comment.