Skip to content

Commit

Permalink
[#78] timed-event-processor: decrease maximum + schedule multiple
Browse files Browse the repository at this point in the history
  • Loading branch information
vbmacher committed May 11, 2023
1 parent 0366965 commit 1e30f2d
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import net.emustudio.emulib.runtime.helpers.ReadWriteLockSupport;
import net.jcip.annotations.ThreadSafe;

import java.util.Map;
import java.util.Queue;
import java.util.SortedMap;
import java.util.SortedSet;
Expand Down Expand Up @@ -70,7 +71,7 @@ public class TimedEventsProcessor {
* <p>
* This function is thread-safe.
*
* @param cycles the number of cycles (must be &gt; 0)
* @param cycles schedule at the number of cycles (must be &gt; 0)
* @param event event to be triggered every given cycles
*/
public void schedule(int cycles, Runnable event) {
Expand Down Expand Up @@ -117,7 +118,7 @@ public void schedule(int cycles, Runnable event) {
* <p>
* Given event cannot be removed by calling {@link #remove(int, Runnable) remove()} function.
*
* @param cycles the number of cycles (must be &gt; 0)
* @param cycles schedule at the number of cycles (must be &gt; 0)
* @param event event to be triggered every given cycles
*/
public void scheduleOnce(int cycles, Runnable event) {
Expand All @@ -128,6 +129,25 @@ public void scheduleOnce(int cycles, Runnable event) {
});
}

/**
* Schedule multiple events to be run after given cycles only once (does not ever repeat).
* <p>
* This function is thread-safe.
* <p>
* Given events cannot be removed by calling {@link #remove(int, Runnable) remove()} function.
*
* @param events events to be scheduled once. Keys are the number of cycles (must be &gt; 0), values events themselves.
*/
public void scheduleOnceMultiple(Map<Integer, Runnable> events) {
lockNoRepeat.lockWrite(() -> {
for (Map.Entry<Integer, Runnable> e : events.entrySet()) {
cycleNoRepeatMaximum.updateAndGet(i -> Math.max(i, e.getKey()));
Queue<Runnable> queue = noRepeatEventQueue.computeIfAbsent(e.getKey(), k -> new ConcurrentLinkedQueue<>());
queue.add(e.getValue());
}
});
}

/**
* Remove scheduled event from this processor.
* <p>
Expand All @@ -152,8 +172,16 @@ public void remove(int cycles, Runnable event) {
if (i % cycles == 0 && eventQueue.containsKey(i)) {
Queue<Runnable> iEvent = eventQueue.get(i);
iEvent.remove(event);
if (iEvent.isEmpty()) {
eventQueue.remove(i);
}
}
}
if (eventQueue.isEmpty()) {
cycleMaximum.set(0);
} else {
cycleMaximum.set(eventQueue.lastKey());
}
}
});
}
Expand All @@ -174,6 +202,11 @@ public void removeAll(int cycles) {
eventQueue.remove(i);
}
}
if (eventQueue.isEmpty()) {
cycleMaximum.set(0);
} else {
cycleMaximum.set(eventQueue.lastKey());
}
}
});
}
Expand Down Expand Up @@ -225,24 +258,39 @@ public void advanceClock(int cycles) {

private void advanceNonRepeatedClock(int cycles) {
int currentCycleMaximum = cycleNoRepeatMaximum.get();

if (currentCycleMaximum > 0) {
int newClock = (nonRepeatClock + (cycles % (currentCycleMaximum + 1)));
noRepeatEventQueue
.subMap(nonRepeatClock, newClock + 1)
.forEach((k, v) -> {
v.forEach(Runnable::run);
noRepeatEventQueue.remove(k);
});
if (newClock > currentCycleMaximum) {
newClock = newClock % currentCycleMaximum;
if (cycles > currentCycleMaximum) {
noRepeatEventQueue
.forEach((k, v) -> {
v.forEach(Runnable::run);
noRepeatEventQueue.remove(k);
});
nonRepeatClock = 0;
} else {
int newClock = (nonRepeatClock + (cycles % (currentCycleMaximum + 1)));
noRepeatEventQueue
.subMap(0, newClock + 1)
.subMap(nonRepeatClock, newClock + 1)
.forEach((k, v) -> {
v.forEach(Runnable::run);
noRepeatEventQueue.remove(k);
});
if (newClock > currentCycleMaximum) {
newClock = newClock % currentCycleMaximum;
noRepeatEventQueue
.subMap(0, newClock + 1)
.forEach((k, v) -> {
v.forEach(Runnable::run);
noRepeatEventQueue.remove(k);
});
}
nonRepeatClock = newClock;
}
if (noRepeatEventQueue.isEmpty()) {
cycleNoRepeatMaximum.set(0);
} else {
cycleNoRepeatMaximum.set(noRepeatEventQueue.lastKey());
}
nonRepeatClock = newClock;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.Ignore;
import org.junit.Test;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -171,6 +172,28 @@ public void testScheduleMaximumFillUp() {
assertEquals(6, count.get());
}

@Test
public void testScheduleOnceDecreaseMaximum() {
AtomicInteger count = new AtomicInteger();
tep.scheduleOnce(10, count::incrementAndGet);
tep.advanceClock(18); // should decrease maximum
tep.scheduleOnce(1, count::incrementAndGet);
tep.advanceClock(1);
assertEquals(2, count.get()); // the 1
}

@Test
public void testScheduleDecreaseMaximum() {
AtomicInteger count = new AtomicInteger();
Runnable r1 = count::incrementAndGet;
tep.scheduleOnce(10, r1);
tep.advanceClock(18);
tep.remove(10, r1); // should decrease maximum
tep.scheduleOnce(1, count::incrementAndGet);
tep.advanceClock(1);
assertEquals(2, count.get()); // the 1
}

@Test(expected = IllegalArgumentException.class)
public void testAdvanceNegativeCyclesThrows() {
tep.advanceClock(-1);
Expand All @@ -180,4 +203,16 @@ public void testAdvanceNegativeCyclesThrows() {
public void testAdvanceZeroCyclesThrows() {
tep.advanceClock(0);
}

@Test
public void testScheduleOnceMultiple() {
AtomicInteger count = new AtomicInteger();
tep.scheduleOnceMultiple(Map.of(
2, count::incrementAndGet,
4, count::incrementAndGet,
6, count::incrementAndGet
));
tep.advanceClock(6);
assertEquals(3, count.get());
}
}

0 comments on commit 1e30f2d

Please sign in to comment.