Skip to content

Commit

Permalink
Use a sliding window to track acknowledged values.
Browse files Browse the repository at this point in the history
  • Loading branch information
uncle-betty committed Sep 21, 2015
1 parent 9b13582 commit f474adb
Showing 1 changed file with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.yahoo.ycsb.generator;

import java.util.PriorityQueue;
import java.util.concurrent.locks.ReentrantLock;

/**
* A CounterGenerator that reports generated integers via lastInt()
* only after they have been acknowledged.
*/
public class AcknowledgedCounterGenerator extends CounterGenerator
{
private PriorityQueue<Integer> ack;
private static final int WINDOW_SIZE = 10000;

private ReentrantLock lock;
private boolean[] window;
private int limit;

/**
Expand All @@ -17,7 +20,8 @@ public class AcknowledgedCounterGenerator extends CounterGenerator
public AcknowledgedCounterGenerator(int countstart)
{
super(countstart);
ack = new PriorityQueue<Integer>();
lock = new ReentrantLock();
window = new boolean[WINDOW_SIZE];
limit = countstart - 1;
}

Expand All @@ -34,17 +38,35 @@ public int lastInt()
/**
* Make a generated counter value available via lastInt().
*/
public synchronized void acknowledge(int value)
public void acknowledge(int value)
{
ack.add(value);
if (value > limit + WINDOW_SIZE) {
throw new RuntimeException("This should be a different exception.");
}

window[value % WINDOW_SIZE] = true;

if (lock.tryLock()) {
// move a contiguous sequence from the window
// over to the "limit" variable

try {
int index;

for (index = limit + 1; index <= value; ++index) {
int slot = index % WINDOW_SIZE;

// move a contiguous sequence from the priority queue
// over to the "limit" variable
if (!window[slot]) {
break;
}

Integer min;
window[slot] = false;
}

while ((min = ack.peek()) != null && min == limit + 1) {
limit = ack.poll();
limit = index - 1;
} finally {
lock.unlock();
}
}
}
}

0 comments on commit f474adb

Please sign in to comment.