Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Aggregate event handle #4625

Merged
merged 4 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

package org.opensearch.dataprepper.model.acknowledgements;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.time.Duration;
import java.util.function.Consumer;

Expand All @@ -29,32 +26,4 @@ public interface AcknowledgementSetManager {
* @since 2.2
*/
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout);

/**
* Releases an event's reference
*
* @param eventHandle event handle
* @param success indicates negative or positive acknowledgement
*
* @since 2.2
*/
void releaseEventReference(final EventHandle eventHandle, boolean success);

/**
* Acquires an event's reference
*
* @param eventHandle event handle
*
* @since 2.2
*/
void acquireEventReference(final EventHandle eventHandle);

/**
* Acquires an event's reference
*
* @param event event
*
* @since 2.2
*/
void acquireEventReference(final Event event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.lang.ref.WeakReference;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.time.Instant;
import java.io.Serializable;

public class AggregateEventHandle implements EventHandle, InternalEventHandle, Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a package-protected AbstractEventHandle that shares some of the logic? This will help clarify the differences between the two and also consolidate similar code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this class be made internal? Are we going to have processors use this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It will be used by AggregateProcessor.

private Instant externalOriginationTime;
private final Instant internalOriginationTime;
private List<WeakReference<AcknowledgementSet>> acknowledgementSetRefList;
private Set<Integer> acknowledgementSetHashes;
private final ReentrantLock lock;
private List<BiConsumer<EventHandle, Boolean>> releaseConsumers;

public AggregateEventHandle(final Instant internalOriginationTime) {
this.acknowledgementSetRefList = new ArrayList<>();
this.externalOriginationTime = null;
this.internalOriginationTime = internalOriginationTime;
this.lock = new ReentrantLock(true);
this.releaseConsumers = new ArrayList<>();
this.acknowledgementSetHashes = new HashSet<>();
}

@Override
public void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
int hashCode = acknowledgementSet.hashCode();
if (!acknowledgementSetHashes.contains(hashCode)) {
this.acknowledgementSetRefList.add(new WeakReference<>(acknowledgementSet));
acknowledgementSetHashes.add(hashCode);
}
}

@Override
public void setExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

@Override
public boolean hasAcknowledgementSet() {
return acknowledgementSetRefList.size() != 0;
}

@Override
public Instant getInternalOriginationTime() {
return this.internalOriginationTime;
}

@Override
public Instant getExternalOriginationTime() {
return this.externalOriginationTime;
}

@Override
public void acquireReference() {
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: acknowledgementSetRefList) {;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a dangling semi-colon. {;

AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.acquire(this);
}
}
}
}

@Override
public boolean release(boolean result) {
synchronized (releaseConsumers) {
for (final BiConsumer<EventHandle, Boolean> consumer: releaseConsumers) {
consumer.accept(this, result);
}
}
boolean returnValue = true;
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: acknowledgementSetRefList) {;
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
} else {
returnValue = false;
}
}
}
return returnValue;
}

// For testing
List<WeakReference<AcknowledgementSet>> getAcknowledgementSetRefs() {
return acknowledgementSetRefList;
}

@Override
public void onRelease(BiConsumer<EventHandle, Boolean> releaseConsumer) {
synchronized (releaseConsumers) {
releaseConsumers.add(releaseConsumer);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public DefaultEventHandle(final Instant internalOriginationTime) {
}

@Override
public void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
public void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
this.acknowledgementSetRef = new WeakReference<>(acknowledgementSet);
}

Expand All @@ -44,6 +44,22 @@ public AcknowledgementSet getAcknowledgementSet() {
return acknowledgementSetRef.get();
}

@Override
public boolean hasAcknowledgementSet() {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
return acknowledgementSet != null;
}

@Override
public void acquireReference() {
synchronized (this) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.acquire(this);
}
}
}

@Override
public Instant getInternalOriginationTime() {
return this.internalOriginationTime;
Expand All @@ -55,7 +71,7 @@ public Instant getExternalOriginationTime() {
}

@Override
public void release(boolean result) {
public boolean release(boolean result) {
synchronized (releaseConsumers) {
for (final BiConsumer<EventHandle, Boolean> consumer: releaseConsumers) {
consumer.accept(this, result);
Expand All @@ -64,7 +80,9 @@ public void release(boolean result) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
return true;
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ public interface EventHandle {
*
* @param result result to be used while releasing. This indicates if
* the operation on the event handle is success or not
* @return returns true if the event handle is released successful, false otherwise
* @since 2.2
*/
void release(boolean result);
boolean release(boolean result);

/**
* sets external origination time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@

public interface InternalEventHandle {
/**
* sets acknowledgement set
* adds acknowledgement set
*
* @param acknowledgementSet acknowledgementSet to be set in the event handle
* @since 2.6
* @since 2.9
*/
void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet);
void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet);

/**
* gets acknowledgement set
* Indicates if the event handle has atleast one acknowledgement set
*
* @return returns acknowledgementSet from the event handle
* @since 2.6
* @return returns true if there is at least one acknowledgementSet in the event handle
* @since 2.9
*/
AcknowledgementSet getAcknowledgementSet();
boolean hasAcknowledgementSet();

/**
* Acquires reference to acknowledgement set(s) in the event handle
*
* @since 2.9
*/
void acquireReference();

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import org.junit.jupiter.api.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.doAnswer;
import org.mockito.Mock;

import java.lang.ref.WeakReference;
import java.time.Instant;

class AggregateEventHandleTests {
@Mock
private AcknowledgementSet acknowledgementSet1;
@Mock
private AcknowledgementSet acknowledgementSet2;
private int count;
private int acquireCount;

@Test
void testBasic() {
Instant now = Instant.now();
AggregateEventHandle eventHandle = new AggregateEventHandle(now);
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
assertThat(eventHandle.hasAcknowledgementSet(), equalTo(false));
eventHandle.acquireReference();
eventHandle.release(true);
}

@Test
void testWithAcknowledgementSet() {
acquireCount = 0;
acknowledgementSet1 = mock(AcknowledgementSet.class);
acknowledgementSet2 = mock(AcknowledgementSet.class);
when(acknowledgementSet1.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true);
when(acknowledgementSet2.release(any(EventHandle.class), any(Boolean.class))).thenReturn(true);
doAnswer(a -> {
acquireCount++;
return null;
}).when(acknowledgementSet1).acquire(any(EventHandle.class));
doAnswer(a -> {
acquireCount++;
return null;
}).when(acknowledgementSet2).acquire(any(EventHandle.class));
Instant now = Instant.now();
AggregateEventHandle eventHandle = new AggregateEventHandle(now);
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.addAcknowledgementSet(acknowledgementSet1);
// just do duplicate add
eventHandle.addAcknowledgementSet(acknowledgementSet1);
assertThat(eventHandle.hasAcknowledgementSet(), equalTo(true));
eventHandle.addAcknowledgementSet(acknowledgementSet2);
eventHandle.acquireReference();
verify(acknowledgementSet1).acquire(eventHandle);
verify(acknowledgementSet2).acquire(eventHandle);
assertThat(acquireCount, equalTo(2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this assertion? And also, why the acquireCount tracking?

I believe that lines 67-68 fully assert the same thing.

eventHandle.release(true);
verify(acknowledgementSet1).release(eventHandle, true);
verify(acknowledgementSet2).release(eventHandle, true);
}

@Test
void testWithExternalOriginationTime() {
Instant now = Instant.now();
AggregateEventHandle eventHandle = new AggregateEventHandle(now);
assertThat(eventHandle.hasAcknowledgementSet(), equalTo(false));
assertThat(eventHandle.getInternalOriginationTime(), equalTo(now));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(null));
eventHandle.setExternalOriginationTime(now.minusSeconds(60));
assertThat(eventHandle.getExternalOriginationTime(), equalTo(now.minusSeconds(60)));
eventHandle.release(true);
}

@Test
void testWithOnReleaseHandler() {
Instant now = Instant.now();
count = 0;
AggregateEventHandle eventHandle = new AggregateEventHandle(now);
acknowledgementSet1 = mock(AcknowledgementSet.class);
acknowledgementSet2 = mock(AcknowledgementSet.class);
eventHandle.onRelease((handle, result) -> {if (result) count++; });
eventHandle.addAcknowledgementSet(acknowledgementSet1);
assertThat(eventHandle.hasAcknowledgementSet(), equalTo(true));
eventHandle.addAcknowledgementSet(acknowledgementSet2);
// Simulate weak reference object not available for
// verification tests to pass 100%
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: eventHandle.getAcknowledgementSetRefs()) {
if (acknowledgementSetRef.get() == acknowledgementSet2 ) {
acknowledgementSetRef.clear();
break;
}
}
eventHandle.release(true);
assertThat(count, equalTo(1));
verify(acknowledgementSet1, times(1)).release(eventHandle, true);
verify(acknowledgementSet2, times(0)).release(eventHandle, true);

}

}

Loading
Loading