-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Aggregate event handle #4625
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.model.event; | ||
|
||
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; | ||
import java.lang.ref.WeakReference; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.function.BiConsumer; | ||
import java.time.Instant; | ||
import java.io.Serializable; | ||
|
||
public class AggregateEventHandle implements EventHandle, InternalEventHandle, Serializable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this class be made internal? Are we going to have processors use this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. It will be used by AggregateProcessor. |
||
private Instant externalOriginationTime; | ||
private final Instant internalOriginationTime; | ||
private List<WeakReference<AcknowledgementSet>> acknowledgementSetRefList; | ||
private Set<Integer> acknowledgementSetHashes; | ||
private final ReentrantLock lock; | ||
private List<BiConsumer<EventHandle, Boolean>> releaseConsumers; | ||
|
||
public AggregateEventHandle(final Instant internalOriginationTime) { | ||
this.acknowledgementSetRefList = new ArrayList<>(); | ||
this.externalOriginationTime = null; | ||
this.internalOriginationTime = internalOriginationTime; | ||
this.lock = new ReentrantLock(true); | ||
this.releaseConsumers = new ArrayList<>(); | ||
this.acknowledgementSetHashes = new HashSet<>(); | ||
} | ||
|
||
@Override | ||
public void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet) { | ||
int hashCode = acknowledgementSet.hashCode(); | ||
if (!acknowledgementSetHashes.contains(hashCode)) { | ||
this.acknowledgementSetRefList.add(new WeakReference<>(acknowledgementSet)); | ||
acknowledgementSetHashes.add(hashCode); | ||
} | ||
} | ||
|
||
@Override | ||
public void setExternalOriginationTime(final Instant externalOriginationTime) { | ||
this.externalOriginationTime = externalOriginationTime; | ||
} | ||
|
||
@Override | ||
public boolean hasAcknowledgementSet() { | ||
return acknowledgementSetRefList.size() != 0; | ||
} | ||
|
||
@Override | ||
public Instant getInternalOriginationTime() { | ||
return this.internalOriginationTime; | ||
} | ||
|
||
@Override | ||
public Instant getExternalOriginationTime() { | ||
return this.externalOriginationTime; | ||
} | ||
|
||
@Override | ||
public void acquireReference() { | ||
synchronized (this) { | ||
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: acknowledgementSetRefList) {; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You have a dangling semi-colon. |
||
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get(); | ||
if (acknowledgementSet != null) { | ||
acknowledgementSet.acquire(this); | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public boolean release(boolean result) { | ||
synchronized (releaseConsumers) { | ||
for (final BiConsumer<EventHandle, Boolean> consumer: releaseConsumers) { | ||
consumer.accept(this, result); | ||
} | ||
} | ||
boolean returnValue = true; | ||
synchronized (this) { | ||
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: acknowledgementSetRefList) {; | ||
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get(); | ||
if (acknowledgementSet != null) { | ||
acknowledgementSet.release(this, result); | ||
} else { | ||
returnValue = false; | ||
} | ||
} | ||
} | ||
return returnValue; | ||
} | ||
|
||
// For testing | ||
List<WeakReference<AcknowledgementSet>> getAcknowledgementSetRefs() { | ||
return acknowledgementSetRefList; | ||
} | ||
|
||
@Override | ||
public void onRelease(BiConsumer<EventHandle, Boolean> releaseConsumer) { | ||
synchronized (releaseConsumers) { | ||
releaseConsumers.add(releaseConsumer); | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.model.event; | ||
|
||
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import org.junit.jupiter.api.Test; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.doAnswer; | ||
import org.mockito.Mock; | ||
|
||
import java.lang.ref.WeakReference; | ||
import java.time.Instant; | ||
|
||
class AggregateEventHandleTests { | ||
@Mock | ||
private AcknowledgementSet acknowledgementSet1; | ||
@Mock | ||
private AcknowledgementSet acknowledgementSet2; | ||
private int count; | ||
private int acquireCount; | ||
|
||
@Test | ||
void testBasic() { | ||
Instant now = Instant.now(); | ||
AggregateEventHandle eventHandle = new AggregateEventHandle(now); | ||
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); | ||
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); | ||
assertThat(eventHandle.hasAcknowledgementSet(), equalTo(false)); | ||
eventHandle.acquireReference(); | ||
eventHandle.release(true); | ||
} | ||
|
||
@Test | ||
void testWithAcknowledgementSet() { | ||
acquireCount = 0; | ||
acknowledgementSet1 = mock(AcknowledgementSet.class); | ||
acknowledgementSet2 = mock(AcknowledgementSet.class); | ||
when(acknowledgementSet1.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true); | ||
when(acknowledgementSet2.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true); | ||
doAnswer(a -> { | ||
acquireCount++; | ||
return null; | ||
}).when(acknowledgementSet1).acquire(any(EventHandle.class)); | ||
doAnswer(a -> { | ||
acquireCount++; | ||
return null; | ||
}).when(acknowledgementSet2).acquire(any(EventHandle.class)); | ||
Instant now = Instant.now(); | ||
AggregateEventHandle eventHandle = new AggregateEventHandle(now); | ||
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); | ||
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); | ||
eventHandle.addAcknowledgementSet(acknowledgementSet1); | ||
// just do duplicate add | ||
eventHandle.addAcknowledgementSet(acknowledgementSet1); | ||
assertThat(eventHandle.hasAcknowledgementSet(), equalTo(true)); | ||
eventHandle.addAcknowledgementSet(acknowledgementSet2); | ||
eventHandle.acquireReference(); | ||
verify(acknowledgementSet1).acquire(eventHandle); | ||
verify(acknowledgementSet2).acquire(eventHandle); | ||
assertThat(acquireCount, equalTo(2)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need this assertion? And also, why the acquireCount tracking? I believe that lines 67-68 fully assert the same thing. |
||
eventHandle.release(true); | ||
verify(acknowledgementSet1).release(eventHandle, true); | ||
verify(acknowledgementSet2).release(eventHandle, true); | ||
} | ||
|
||
@Test | ||
void testWithExternalOriginationTime() { | ||
Instant now = Instant.now(); | ||
AggregateEventHandle eventHandle = new AggregateEventHandle(now); | ||
assertThat(eventHandle.hasAcknowledgementSet(), equalTo(false)); | ||
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now)); | ||
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null)); | ||
eventHandle.setExternalOriginationTime(now.minusSeconds(60)); | ||
assertThat(eventHandle.getExternalOriginationTime(), equalTo(now.minusSeconds(60))); | ||
eventHandle.release(true); | ||
} | ||
|
||
@Test | ||
void testWithOnReleaseHandler() { | ||
Instant now = Instant.now(); | ||
count = 0; | ||
AggregateEventHandle eventHandle = new AggregateEventHandle(now); | ||
acknowledgementSet1 = mock(AcknowledgementSet.class); | ||
acknowledgementSet2 = mock(AcknowledgementSet.class); | ||
eventHandle.onRelease((handle, result) -> {if (result) count++; }); | ||
eventHandle.addAcknowledgementSet(acknowledgementSet1); | ||
assertThat(eventHandle.hasAcknowledgementSet(), equalTo(true)); | ||
eventHandle.addAcknowledgementSet(acknowledgementSet2); | ||
// Simulate weak reference object not available for | ||
// verification tests to pass 100% | ||
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: eventHandle.getAcknowledgementSetRefs()) { | ||
if (acknowledgementSetRef.get() == acknowledgementSet2 ) { | ||
acknowledgementSetRef.clear(); | ||
break; | ||
} | ||
} | ||
eventHandle.release(true); | ||
assertThat(count, equalTo(1)); | ||
verify(acknowledgementSet1, times(1)).release(eventHandle, true); | ||
verify(acknowledgementSet2, times(0)).release(eventHandle, true); | ||
|
||
} | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make a package-protected
AbstractEventHandle
that shares some of the logic? This will help clarify the differences between the two and also consolidate similar code.