Skip to content

Commit

Permalink
Add Internal and external latency to OpenSearch and S3 sinks. (#3583)
Browse files Browse the repository at this point in the history
Add Internal and external latency to OpenSearch and S3 sinks

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka committed Nov 7, 2023
1 parent 2d7e4d3 commit a40f4b8
Show file tree
Hide file tree
Showing 19 changed files with 388 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,23 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import java.lang.ref.WeakReference;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.time.Instant;
import java.io.Serializable;

public class DefaultEventHandle implements EventHandle, InternalEventHandle, Serializable {
private Instant externalOriginationTime;
private final Instant internalOriginationTime;
private WeakReference<AcknowledgementSet> acknowledgementSetRef;
private List<BiConsumer<EventHandle, Boolean>> releaseConsumers;

public DefaultEventHandle(final Instant internalOriginationTime) {
this.acknowledgementSetRef = null;
this.externalOriginationTime = null;
this.internalOriginationTime = internalOriginationTime;
this.releaseConsumers = new ArrayList<>();
}

@Override
Expand Down Expand Up @@ -51,9 +56,17 @@ public Instant getExternalOriginationTime() {

@Override
public void release(boolean result) {
for (final BiConsumer<EventHandle, Boolean> consumer: releaseConsumers) {
consumer.accept(this, result);
}
AcknowledgementSet acknowledgementSet = getAcknowledgementSet();
if (acknowledgementSet != null) {
acknowledgementSet.release(this, result);
}
}

@Override
public void onRelease(BiConsumer<EventHandle, Boolean> releaseConsumer) {
releaseConsumers.add(releaseConsumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.model.event;

import java.time.Instant;
import java.util.function.BiConsumer;

public interface EventHandle {
/**
Expand Down Expand Up @@ -41,5 +42,6 @@ public interface EventHandle {
*/
Instant getInternalOriginationTime();

void onRelease(BiConsumer<EventHandle, Boolean> releaseConsumer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;

import java.util.Collection;

Expand All @@ -22,6 +23,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
protected static final int DEFAULT_WAIT_TIME_MS = 1000;
protected final PluginMetrics pluginMetrics;
private final Counter recordsInCounter;
private final SinkLatencyMetrics latencyMetrics;
private final Timer timeElapsedTimer;
private Thread retryThread;
private int maxRetries;
Expand All @@ -31,6 +33,7 @@ public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitT
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
recordsInCounter = pluginMetrics.counter(MetricNames.RECORDS_IN);
timeElapsedTimer = pluginMetrics.timer(MetricNames.TIME_ELAPSED);
this.latencyMetrics = new SinkLatencyMetrics(pluginMetrics);
retryThread = null;
this.maxRetries = numRetries;
this.waitTimeMs = waitTimeMs;
Expand Down Expand Up @@ -77,6 +80,20 @@ public void shutdown() {
}
}

@Override
public void updateLatencyMetrics(Collection<T> records) {
for (final Record record : records) {
if (record.getData() instanceof Event) {
Event event = (Event)record.getData();
event.getEventHandle().onRelease((eventHandle, result) -> {
if (result) {
latencyMetrics.update(eventHandle);
}
});
}
}
}

Thread.State getRetryThreadState() {
if (retryThread != null) {
return retryThread.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ public interface Sink<T extends Record<?>> {
*/
boolean isReady();

/**
* updates latency metrics of sink
*
* @param events list of events used for updating the latency metrics
*/
default void updateLatencyMetrics(final Collection<T> events) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.time.Duration;
import java.time.Instant;

public class SinkLatencyMetrics {
public static final String INTERNAL_LATENCY = "internalLatency";
public static final String EXTERNAL_LATENCY = "externalLatency";
private final DistributionSummary internalLatencySummary;
private final DistributionSummary externalLatencySummary;

public SinkLatencyMetrics(PluginMetrics pluginMetrics) {
internalLatencySummary = pluginMetrics.summary(INTERNAL_LATENCY);
externalLatencySummary = pluginMetrics.summary(EXTERNAL_LATENCY);
}
public void update(final EventHandle eventHandle) {
Instant now = Instant.now();
internalLatencySummary.record(Duration.between(eventHandle.getInternalOriginationTime(), now).toMillis());
if (eventHandle.getExternalOriginationTime() == null) {
return;
}
externalLatencySummary.record(Duration.between(eventHandle.getExternalOriginationTime(), now).toMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
class DefaultEventHandleTests {
@Mock
private AcknowledgementSet acknowledgementSet;
private int count;

@Test
void testBasic() {
Expand Down Expand Up @@ -56,4 +57,16 @@ void testWithExternalOriginationTime() {
assertThat(eventHandle.getExternalOriginationTime(), equalTo(now.minusSeconds(60)));
eventHandle.release(true);
}

@Test
void testWithOnReleaseHandler() {
Instant now = Instant.now();
count = 0;
DefaultEventHandle eventHandle = new DefaultEventHandle(now);
eventHandle.onRelease((handle, result) -> {if (result) count++; });
eventHandle.release(true);
assertThat(count, equalTo(1));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import java.time.Duration;
import java.util.Arrays;
Expand All @@ -25,6 +31,7 @@
import static org.awaitility.Awaitility.await;

public class AbstractSinkTest {
private int count;
@Test
public void testMetrics() {
final String sinkName = "testSink";
Expand All @@ -35,6 +42,8 @@ public void testMetrics() {
AbstractSink<Record<String>> abstractSink = new AbstractSinkImpl(pluginSetting);
abstractSink.initialize();
Assert.assertEquals(abstractSink.isReady(), true);
abstractSink.updateLatencyMetrics(Arrays.asList(
new Record<>(UUID.randomUUID().toString())));
abstractSink.output(Arrays.asList(
new Record<>(UUID.randomUUID().toString()),
new Record<>(UUID.randomUUID().toString()),
Expand Down Expand Up @@ -80,6 +89,61 @@ public void testSinkNotReady() {
abstractSink.shutdown();
}

@Test
public void testSinkWithRegisterEventReleaseHandler() {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSink<Record<Event>> abstractSink = new AbstractEventSinkImpl(pluginSetting);
abstractSink.initialize();
Assert.assertEquals(abstractSink.isReady(), true);
count = 0;
Event event = JacksonEvent.builder()
.withEventType("event")
.build();
Record record = mock(Record.class);
EventHandle eventHandle = mock(EventHandle.class);
when(record.getData()).thenReturn(event);

abstractSink.updateLatencyMetrics(Arrays.asList(record));
abstractSink.output(Arrays.asList(record));
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
abstractSink.shutdown();
}

private static class AbstractEventSinkImpl extends AbstractSink<Record<Event>> {

public AbstractEventSinkImpl(PluginSetting pluginSetting) {
super(pluginSetting, 10, 1000);
}

@Override
public void doOutput(Collection<Record<Event>> records) {
for (final Record<Event> record: records) {
Event event = record.getData();
event.getEventHandle().release(true);
}
}

@Override
public void shutdown() {
super.shutdown();
}

@Override
public void doInitialize() {
}

@Override
public boolean isReady() {
return true;
}
}


private static class AbstractSinkImpl extends AbstractSink<Record<String>> {

public AbstractSinkImpl(PluginSetting pluginSetting) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.EventHandle;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.core.instrument.DistributionSummary;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

import java.time.Instant;

class SinkLatencyMetricsTest {

private PluginMetrics pluginMetrics;
private EventHandle eventHandle;
private SinkLatencyMetrics latencyMetrics;
private DistributionSummary internalLatencySummary;
private DistributionSummary externalLatencySummary;

public SinkLatencyMetrics createObjectUnderTest() {
return new SinkLatencyMetrics(pluginMetrics);
}

@BeforeEach
void setup() {
pluginMetrics = mock(PluginMetrics.class);
SimpleMeterRegistry registry = new SimpleMeterRegistry();
internalLatencySummary = DistributionSummary
.builder("internalLatency")
.baseUnit("milliseconds")
.register(registry);
externalLatencySummary = DistributionSummary
.builder("externalLatency")
.baseUnit("milliseconds")
.register(registry);
when(pluginMetrics.summary(SinkLatencyMetrics.INTERNAL_LATENCY)).thenReturn(internalLatencySummary);
when(pluginMetrics.summary(SinkLatencyMetrics.EXTERNAL_LATENCY)).thenReturn(externalLatencySummary);
eventHandle = mock(EventHandle.class);
when(eventHandle.getInternalOriginationTime()).thenReturn(Instant.now());
latencyMetrics = createObjectUnderTest();
}

@Test
public void testInternalOriginationTime() {
latencyMetrics.update(eventHandle);
assertThat(internalLatencySummary.count(), equalTo(1L));
}

@Test
public void testExternalOriginationTime() {
when(eventHandle.getExternalOriginationTime()).thenReturn(Instant.now().minusMillis(10));
latencyMetrics.update(eventHandle);
assertThat(internalLatencySummary.count(), equalTo(1L));
assertThat(externalLatencySummary.count(), equalTo(1L));
assertThat(externalLatencySummary.max(), greaterThanOrEqualTo(10.0));
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;

import java.util.Collection;
import java.util.Collections;

public class SinkTest {
private static class SinkTestClass implements Sink<Record<?>> {

@Override
public boolean isReady() {
return true;
}

@Override
public void shutdown() {
}

@Override
public void initialize() {
}

@Override
public void output(Collection<Record<?>> records) {
}

};

SinkTestClass sink;

@Test
public void testSinkUpdateLatencyMetrics() {
sink = new SinkTestClass();
sink.updateLatencyMetrics(Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,11 @@ List<Future<Void>> publishToSinks(final Collection<Record> records) {
InactiveAcknowledgementSetManager.getInstance(),
sinks);
router.route(records, sinks, getRecordStrategy, (sink, events) ->
sinkFutures.add(sinkExecutorService.submit(() -> sink.output(events), null))
);
sinkFutures.add(sinkExecutorService.submit(() -> {
sink.updateLatencyMetrics(events);
sink.output(events);
}, null))
);
return sinkFutures;
}
}
Loading

0 comments on commit a40f4b8

Please sign in to comment.