Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Aggregate event handle #4625

Merged
merged 4 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

package org.opensearch.dataprepper.model.acknowledgements;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.time.Duration;
import java.util.function.Consumer;

Expand All @@ -29,32 +26,4 @@ public interface AcknowledgementSetManager {
* @since 2.2
*/
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout);

/**
* Releases an event's reference
*
* @param eventHandle event handle
* @param success indicates negative or positive acknowledgement
*
* @since 2.2
*/
void releaseEventReference(final EventHandle eventHandle, boolean success);

/**
* Acquires an event's reference
*
* @param eventHandle event handle
*
* @since 2.2
*/
void acquireEventReference(final EventHandle eventHandle);

/**
* Acquires an event's reference
*
* @param event event
*
* @since 2.2
*/
void acquireEventReference(final Event event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import java.util.ArrayList;
import java.util.List;
import java.time.Instant;
import java.util.function.BiConsumer;

abstract class AbstractEventHandle implements EventHandle, InternalEventHandle {
private Instant externalOriginationTime;
private final Instant internalOriginationTime;
private List<BiConsumer<EventHandle, Boolean>> releaseConsumers;

AbstractEventHandle(final Instant internalOriginationTime) {
this.externalOriginationTime = null;
this.internalOriginationTime = internalOriginationTime;
this.releaseConsumers = new ArrayList<>();
}
@Override
public void setExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

@Override
public Instant getInternalOriginationTime() {
return this.internalOriginationTime;
}

@Override
public Instant getExternalOriginationTime() {
return this.externalOriginationTime;
}

@Override
public void onRelease(BiConsumer<EventHandle, Boolean> releaseConsumer) {
synchronized (releaseConsumers) {
releaseConsumers.add(releaseConsumer);
}
}

public void notifyReleaseConsumers(boolean result) {
synchronized (releaseConsumers) {
for (final BiConsumer<EventHandle, Boolean> consumer: releaseConsumers) {
consumer.accept(this, result);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.lang.ref.WeakReference;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.time.Instant;
import java.io.Serializable;

public class AggregateEventHandle extends AbstractEventHandle implements Serializable {
private List<WeakReference<AcknowledgementSet>> acknowledgementSetRefList;
private Set<Integer> acknowledgementSetHashes;

public AggregateEventHandle(final Instant internalOriginationTime) {
super(internalOriginationTime);
this.acknowledgementSetRefList = new ArrayList<>();
this.acknowledgementSetHashes = new HashSet<>();
}

@Override
public void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
int hashCode = acknowledgementSet.hashCode();
if (!acknowledgementSetHashes.contains(hashCode)) {
this.acknowledgementSetRefList.add(new WeakReference<>(acknowledgementSet));
acknowledgementSetHashes.add(hashCode);
}
}

@Override
public boolean hasAcknowledgementSet() {
return acknowledgementSetRefList.size() != 0;
}

@Override
public void acquireReference() {
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: acknowledgementSetRefList) {;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a dangling semi-colon. {;

AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.acquire(this);
}
}
}
}

@Override
public boolean release(boolean result) {
notifyReleaseConsumers(result);
boolean returnValue = true;
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: acknowledgementSetRefList) {
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
} else {
returnValue = false;
}
}
}
return returnValue;
}

// For testing
List<WeakReference<AcknowledgementSet>> getAcknowledgementSetRefs() {
return acknowledgementSetRefList;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,19 @@
import java.time.Instant;
import java.io.Serializable;

public class DefaultEventHandle implements EventHandle, InternalEventHandle, Serializable {
private Instant externalOriginationTime;
private final Instant internalOriginationTime;
public class DefaultEventHandle extends AbstractEventHandle implements Serializable {
private WeakReference<AcknowledgementSet> acknowledgementSetRef;
private List<BiConsumer<EventHandle, Boolean>> releaseConsumers;

public DefaultEventHandle(final Instant internalOriginationTime) {
super(internalOriginationTime);
this.acknowledgementSetRef = null;
this.externalOriginationTime = null;
this.internalOriginationTime = internalOriginationTime;
this.releaseConsumers = new ArrayList<>();
}

@Override
public void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
public void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
this.acknowledgementSetRef = new WeakReference<>(acknowledgementSet);
}

@Override
public void setExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

public AcknowledgementSet getAcknowledgementSet() {
if (acknowledgementSetRef == null) {
return null;
Expand All @@ -45,32 +35,30 @@ public AcknowledgementSet getAcknowledgementSet() {
}

@Override
public Instant getInternalOriginationTime() {
return this.internalOriginationTime;
public boolean hasAcknowledgementSet() {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
return acknowledgementSet != null;
}

@Override
public Instant getExternalOriginationTime() {
return this.externalOriginationTime;
public void acquireReference() {
synchronized (this) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.acquire(this);
}
}
}

@Override
public void release(boolean result) {
synchronized (releaseConsumers) {
for (final BiConsumer<EventHandle, Boolean> consumer: releaseConsumers) {
consumer.accept(this, result);
}
}
public boolean release(boolean result) {
notifyReleaseConsumers(result);
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
return true;
}
return false;
}

@Override
public void onRelease(BiConsumer<EventHandle, Boolean> releaseConsumer) {
synchronized (releaseConsumers) {
releaseConsumers.add(releaseConsumer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ public interface EventHandle {
*
* @param result result to be used while releasing. This indicates if
* the operation on the event handle is success or not
* @return returns true if the event handle is released successful, false otherwise
* @since 2.2
*/
void release(boolean result);
boolean release(boolean result);

/**
* sets external origination time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@

public interface InternalEventHandle {
/**
* sets acknowledgement set
* adds acknowledgement set
*
* @param acknowledgementSet acknowledgementSet to be set in the event handle
* @since 2.6
* @since 2.9
*/
void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet);
void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet);

/**
* gets acknowledgement set
* Indicates if the event handle has atleast one acknowledgement set
*
* @return returns acknowledgementSet from the event handle
* @since 2.6
* @return returns true if there is at least one acknowledgementSet in the event handle
* @since 2.9
*/
AcknowledgementSet getAcknowledgementSet();
boolean hasAcknowledgementSet();

/**
* Acquires reference to acknowledgement set(s) in the event handle
*
* @since 2.9
*/
void acquireReference();

}

Loading
Loading