Skip to content

Commit

Permalink
[#78] timeEventsProcessor optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
vbmacher committed May 11, 2023
1 parent 3734e2f commit 0366965
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import net.emustudio.emulib.runtime.helpers.ReadWriteLockSupport;
import net.jcip.annotations.ThreadSafe;

import java.util.Collection;
import java.util.Queue;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* Timed events processing is a soft real-time system based on a logical system clock,
Expand All @@ -56,12 +54,16 @@ public class TimedEventsProcessor {
private final SortedMap<Integer, Queue<Runnable>> eventQueue = new ConcurrentSkipListMap<>();
private final ReadWriteLockSupport lock = new ReadWriteLockSupport();

// scheduled cycles
private final AtomicInteger cycleNoRepeatMaximum = new AtomicInteger(0);
private final SortedMap<Integer, Queue<Runnable>> noRepeatEventQueue = new ConcurrentSkipListMap<>();
private final ReadWriteLockSupport lockNoRepeat = new ReadWriteLockSupport();

// scheduled (repeated) cycles
private final SortedSet<Integer> usedCycleRoots = new ConcurrentSkipListSet<>();

// used on CPU thread only, don't need to be synchronized
private int clock;
private int lastProcessedCycles = 0;
private int repeatClock;
private int nonRepeatClock;

/**
* Schedule a repeated event to be run every given cycles.
Expand All @@ -77,7 +79,7 @@ public void schedule(int cycles, Runnable event) {
}
lock.lockWrite(() -> {
int oldMaximum = cycleMaximum.getAndUpdate(i -> Math.max(i, cycles));
int newMaximum = Math.max(oldMaximum, cycles);
int newMaximum = cycleMaximum.get();

// 1 1 1 1 1 1 1 1
// 0 2 0 2 0 2 0 2
Expand Down Expand Up @@ -119,14 +121,11 @@ public void schedule(int cycles, Runnable event) {
* @param event event to be triggered every given cycles
*/
public void scheduleOnce(int cycles, Runnable event) {
AtomicReference<Runnable> selfReference = new AtomicReference<>();
Runnable proxy = () -> {
event.run();
remove(cycles, selfReference.get());
};
selfReference.set(proxy);

schedule(cycles, proxy);
lockNoRepeat.lockWrite(() -> {
cycleNoRepeatMaximum.updateAndGet(i -> Math.max(i, cycles));
Queue<Runnable> queue = noRepeatEventQueue.computeIfAbsent(cycles, k -> new ConcurrentLinkedQueue<>());
queue.add(event);
});
}

/**
Expand All @@ -149,8 +148,8 @@ public void remove(int cycles, Runnable event) {
lock.lockWrite(() -> {
if (usedCycleRoots.remove(cycles)) {
int maximum = cycleMaximum.get();
for (int i = cycles; i <= maximum; i++) {
if (i % cycles == 0) {
for (int i = 0; i <= maximum; i++) {
if (i % cycles == 0 && eventQueue.containsKey(i)) {
Queue<Runnable> iEvent = eventQueue.get(i);
iEvent.remove(event);
}
Expand Down Expand Up @@ -188,25 +187,62 @@ public void removeAll(int cycles) {
* @param cycles passed cycles in the system
*/
public void advanceClock(int cycles) {
int currentCycleMaximum = cycleMaximum.get();
clock += cycles;

Collection<Queue<Runnable>> eventsToTrigger = eventQueue
.subMap(lastProcessedCycles, clock + 1)
.values();

if (clock < currentCycleMaximum) {
eventsToTrigger.forEach(e -> e.forEach(Runnable::run));
} else if (currentCycleMaximum > 0) {
for (int i = 0; i < clock / currentCycleMaximum; i++) {
eventsToTrigger.forEach(e -> e.forEach(Runnable::run));
}
if (cycles <= 0) {
throw new IllegalArgumentException("cycles must be > 0");
}
// repeated events first
int currentCycleMaximum = Math.max(1, cycleMaximum.get());
int fullRounds = cycles / currentCycleMaximum;

lastProcessedCycles = clock + 1;
if (clock > currentCycleMaximum) {
clock = (clock % (currentCycleMaximum + 1));
lastProcessedCycles = 0;
for (int i = 0; i < fullRounds; i++) {
eventQueue.forEach((k, v) -> v.forEach(Runnable::run));
}

int oldClock = repeatClock;
// so complex because we don't want integer overflow
repeatClock = (repeatClock + 1 + (cycles % (currentCycleMaximum + 1))) % (currentCycleMaximum + 1);

if (oldClock + 1 < repeatClock) {
eventQueue
.subMap(oldClock + 1, repeatClock)
.values()
.forEach(e -> e.forEach(Runnable::run));
} else if (oldClock == currentCycleMaximum) {
// execute last one
eventQueue
.subMap(currentCycleMaximum, currentCycleMaximum + 1)
.values()
.forEach(e -> e.forEach(Runnable::run));

// and then the rest due to clock overflow
eventQueue
.subMap(0, repeatClock)
.values()
.forEach(e -> e.forEach(Runnable::run));
}
advanceNonRepeatedClock(cycles);
}

private void advanceNonRepeatedClock(int cycles) {
int currentCycleMaximum = cycleNoRepeatMaximum.get();
if (currentCycleMaximum > 0) {
int newClock = (nonRepeatClock + (cycles % (currentCycleMaximum + 1)));
noRepeatEventQueue
.subMap(nonRepeatClock, newClock + 1)
.forEach((k, v) -> {
v.forEach(Runnable::run);
noRepeatEventQueue.remove(k);
});
if (newClock > currentCycleMaximum) {
newClock = newClock % currentCycleMaximum;
noRepeatEventQueue
.subMap(0, newClock + 1)
.forEach((k, v) -> {
v.forEach(Runnable::run);
noRepeatEventQueue.remove(k);
});
}
nonRepeatClock = newClock;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package net.emustudio.emulib.plugins.cpu;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -43,6 +44,8 @@ public void testScheduleAndAdvanceWorks() {
tep.schedule(100, count::incrementAndGet);

tep.advanceClock(5); // should trigger the first event 1x
assertEquals(1, count.get());

tep.advanceClock(44); // should trigger the first event 8x
tep.advanceClock(1); // should trigger the second event 1x and the first event 1x

Expand Down Expand Up @@ -84,6 +87,37 @@ public void testScheduleOnce() {
assertEquals(1, count.get());
}

@Test
public void testScheduleOnceAgain() {
AtomicInteger count = new AtomicInteger();
tep.scheduleOnce(1, count::incrementAndGet);
tep.scheduleOnce(2, count::incrementAndGet);
tep.scheduleOnce(3, count::incrementAndGet);
tep.advanceClock(1);
tep.advanceClock(1);
tep.advanceClock(1);
assertEquals(3, count.get());
}

@Test
public void testScheduleOnceHugeAdvance() {
AtomicInteger count = new AtomicInteger();
tep.scheduleOnce(10, count::incrementAndGet);
tep.advanceClock(9);
tep.advanceClock(Integer.MAX_VALUE);
assertEquals(1, count.get());
}

@Test
public void testScheduleOneCycle() {
AtomicInteger count = new AtomicInteger();
tep.schedule(1, count::incrementAndGet);
tep.advanceClock(1);
tep.advanceClock(1);
tep.advanceClock(1);
assertEquals(3, count.get());
}

@Test
public void testRemoveCyclesRemovesDerivedOnes() {
AtomicInteger count = new AtomicInteger();
Expand All @@ -102,13 +136,31 @@ public void testRemoveAll() {
AtomicInteger count = new AtomicInteger();

tep.schedule(1, count::incrementAndGet);
tep.schedule(10, count::incrementAndGet);
tep.schedule(10, () -> System.out.println("HH"));
tep.removeAll(1);

tep.advanceClock(10);
assertEquals(0, count.get());
}

@Test
public void testScheduleOverflow() {
AtomicInteger count = new AtomicInteger();

tep.schedule(5, count::incrementAndGet);
tep.schedule(9, count::incrementAndGet);
tep.schedule(7, count::incrementAndGet);

tep.advanceClock(5);
assertEquals(1, count.get());

tep.advanceClock(2);
assertEquals(2, count.get());

tep.advanceClock(2);
assertEquals(3, count.get());
}

@Test
public void testScheduleMaximumFillUp() {
AtomicInteger count = new AtomicInteger();
Expand All @@ -118,4 +170,14 @@ public void testScheduleMaximumFillUp() {
tep.advanceClock(10);
assertEquals(6, count.get());
}

@Test(expected = IllegalArgumentException.class)
public void testAdvanceNegativeCyclesThrows() {
tep.advanceClock(-1);
}

@Test(expected = IllegalArgumentException.class)
public void testAdvanceZeroCyclesThrows() {
tep.advanceClock(0);
}
}

0 comments on commit 0366965

Please sign in to comment.