Skip to content

Commit

Permalink
Add Aggregate event handle (#4625)
Browse files Browse the repository at this point in the history
Aggregate event handle

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka committed Jun 17, 2024
1 parent 52d2f0e commit a85e05e
Show file tree
Hide file tree
Showing 26 changed files with 375 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

package org.opensearch.dataprepper.model.acknowledgements;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.time.Duration;
import java.util.function.Consumer;

Expand All @@ -29,32 +26,4 @@ public interface AcknowledgementSetManager {
* @since 2.2
*/
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout);

/**
* Releases an event's reference
*
* @param eventHandle event handle
* @param success indicates negative or positive acknowledgement
*
* @since 2.2
*/
void releaseEventReference(final EventHandle eventHandle, boolean success);

/**
* Acquires an event's reference
*
* @param eventHandle event handle
*
* @since 2.2
*/
void acquireEventReference(final EventHandle eventHandle);

/**
* Acquires an event's reference
*
* @param event event
*
* @since 2.2
*/
void acquireEventReference(final Event event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import java.util.ArrayList;
import java.util.List;
import java.time.Instant;
import java.util.function.BiConsumer;

abstract class AbstractEventHandle implements EventHandle, InternalEventHandle {
private Instant externalOriginationTime;
private final Instant internalOriginationTime;
private List<BiConsumer<EventHandle, Boolean>> releaseConsumers;

AbstractEventHandle(final Instant internalOriginationTime) {
this.externalOriginationTime = null;
this.internalOriginationTime = internalOriginationTime;
this.releaseConsumers = new ArrayList<>();
}
@Override
public void setExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

@Override
public Instant getInternalOriginationTime() {
return this.internalOriginationTime;
}

@Override
public Instant getExternalOriginationTime() {
return this.externalOriginationTime;
}

@Override
public void onRelease(BiConsumer<EventHandle, Boolean> releaseConsumer) {
synchronized (releaseConsumers) {
releaseConsumers.add(releaseConsumer);
}
}

public void notifyReleaseConsumers(boolean result) {
synchronized (releaseConsumers) {
for (final BiConsumer<EventHandle, Boolean> consumer: releaseConsumers) {
consumer.accept(this, result);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.lang.ref.WeakReference;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.time.Instant;
import java.io.Serializable;

public class AggregateEventHandle extends AbstractEventHandle implements Serializable {
private List<WeakReference<AcknowledgementSet>> acknowledgementSetRefList;
private Set<Integer> acknowledgementSetHashes;

public AggregateEventHandle(final Instant internalOriginationTime) {
super(internalOriginationTime);
this.acknowledgementSetRefList = new ArrayList<>();
this.acknowledgementSetHashes = new HashSet<>();
}

@Override
public void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
int hashCode = acknowledgementSet.hashCode();
if (!acknowledgementSetHashes.contains(hashCode)) {
this.acknowledgementSetRefList.add(new WeakReference<>(acknowledgementSet));
acknowledgementSetHashes.add(hashCode);
}
}

@Override
public boolean hasAcknowledgementSet() {
return acknowledgementSetRefList.size() != 0;
}

@Override
public void acquireReference() {
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: acknowledgementSetRefList) {;
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.acquire(this);
}
}
}
}

@Override
public boolean release(boolean result) {
notifyReleaseConsumers(result);
boolean returnValue = true;
synchronized (this) {
for (WeakReference<AcknowledgementSet> acknowledgementSetRef: acknowledgementSetRefList) {
AcknowledgementSet acknowledgementSet = acknowledgementSetRef.get();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
} else {
returnValue = false;
}
}
}
return returnValue;
}

// For testing
List<WeakReference<AcknowledgementSet>> getAcknowledgementSetRefs() {
return acknowledgementSetRefList;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,22 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.lang.ref.WeakReference;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.time.Instant;
import java.io.Serializable;

public class DefaultEventHandle implements EventHandle, InternalEventHandle, Serializable {
private Instant externalOriginationTime;
private final Instant internalOriginationTime;
public class DefaultEventHandle extends AbstractEventHandle implements Serializable {
private WeakReference<AcknowledgementSet> acknowledgementSetRef;
private List<BiConsumer<EventHandle, Boolean>> releaseConsumers;

public DefaultEventHandle(final Instant internalOriginationTime) {
super(internalOriginationTime);
this.acknowledgementSetRef = null;
this.externalOriginationTime = null;
this.internalOriginationTime = internalOriginationTime;
this.releaseConsumers = new ArrayList<>();
}

@Override
public void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
public void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet) {
this.acknowledgementSetRef = new WeakReference<>(acknowledgementSet);
}

@Override
public void setExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
}

public AcknowledgementSet getAcknowledgementSet() {
if (acknowledgementSetRef == null) {
return null;
Expand All @@ -45,32 +32,30 @@ public AcknowledgementSet getAcknowledgementSet() {
}

@Override
public Instant getInternalOriginationTime() {
return this.internalOriginationTime;
public boolean hasAcknowledgementSet() {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
return acknowledgementSet != null;
}

@Override
public Instant getExternalOriginationTime() {
return this.externalOriginationTime;
public void acquireReference() {
synchronized (this) {
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.acquire(this);
}
}
}

@Override
public void release(boolean result) {
synchronized (releaseConsumers) {
for (final BiConsumer<EventHandle, Boolean> consumer: releaseConsumers) {
consumer.accept(this, result);
}
}
public boolean release(boolean result) {
notifyReleaseConsumers(result);
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
return true;
}
return false;
}

@Override
public void onRelease(BiConsumer<EventHandle, Boolean> releaseConsumer) {
synchronized (releaseConsumers) {
releaseConsumers.add(releaseConsumer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ public interface EventHandle {
*
* @param result result to be used while releasing. This indicates if
* the operation on the event handle is success or not
* @return returns true if the event handle is released successful, false otherwise
* @since 2.2
*/
void release(boolean result);
boolean release(boolean result);

/**
* sets external origination time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@

public interface InternalEventHandle {
/**
* sets acknowledgement set
* adds acknowledgement set
*
* @param acknowledgementSet acknowledgementSet to be set in the event handle
* @since 2.6
* @since 2.9
*/
void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet);
void addAcknowledgementSet(final AcknowledgementSet acknowledgementSet);

/**
* gets acknowledgement set
* Indicates if the event handle has atleast one acknowledgement set
*
* @return returns acknowledgementSet from the event handle
* @since 2.6
* @return returns true if there is at least one acknowledgementSet in the event handle
* @since 2.9
*/
AcknowledgementSet getAcknowledgementSet();
boolean hasAcknowledgementSet();

/**
* Acquires reference to acknowledgement set(s) in the event handle
*
* @since 2.9
*/
void acquireReference();

}

Loading

0 comments on commit a85e05e

Please sign in to comment.