Skip to content

Commit

Permalink
REF: service-map processor with the latest config model (#4734)
Browse files Browse the repository at this point in the history
* REF: service-map processor with the latest config model

Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 committed Jul 15, 2024
1 parent aeac953 commit c4455a7
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 26 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/service-map-stateful/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
exclude group: 'com.google.protobuf', module: 'protobuf-java'
}
implementation libs.protobuf.core
testImplementation project(':data-prepper-test-common')
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,20 @@

package org.opensearch.dataprepper.plugins.processor;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

public class ServiceMapProcessorConfig {
static final String WINDOW_DURATION = "window_duration";
private static final String WINDOW_DURATION = "window_duration";
static final int DEFAULT_WINDOW_DURATION = 180;
static final String DEFAULT_DB_PATH = "data/service-map/";

@JsonProperty(WINDOW_DURATION)
@JsonPropertyDescription("Represents the fixed time window, in seconds, " +
"during which service map relationships are evaluated. Default value is 180.")
private int windowDuration = DEFAULT_WINDOW_DURATION;

public int getWindowDuration() {
return windowDuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.dataprepper.plugins.processor;

import org.apache.commons.codec.DecoderException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
Expand Down Expand Up @@ -40,7 +42,8 @@
import java.util.concurrent.atomic.AtomicInteger;

@SingleThread
@DataPrepperPlugin(name = "service_map", deprecatedName = "service_map_stateful", pluginType = Processor.class)
@DataPrepperPlugin(name = "service_map", deprecatedName = "service_map_stateful", pluginType = Processor.class,
pluginConfigurationType = ServiceMapProcessorConfig.class)
public class ServiceMapStatefulProcessor extends AbstractProcessor<Record<Event>, Record<Event>> implements RequiresPeerForwarding {

static final String SPANS_DB_SIZE = "spansDbSize";
Expand Down Expand Up @@ -75,20 +78,24 @@ public class ServiceMapStatefulProcessor extends AbstractProcessor<Record<Event>

private final int thisProcessorId;

public ServiceMapStatefulProcessor(final PluginSetting pluginSetting) {
this(pluginSetting.getIntegerOrDefault(ServiceMapProcessorConfig.WINDOW_DURATION, ServiceMapProcessorConfig.DEFAULT_WINDOW_DURATION) * TO_MILLIS,
@DataPrepperPluginConstructor
public ServiceMapStatefulProcessor(
final ServiceMapProcessorConfig serviceMapProcessorConfig,
final PluginMetrics pluginMetrics,
final PipelineDescription pipelineDescription) {
this((long) serviceMapProcessorConfig.getWindowDuration() * TO_MILLIS,
new File(ServiceMapProcessorConfig.DEFAULT_DB_PATH),
Clock.systemUTC(),
pluginSetting.getNumberOfProcessWorkers(),
pluginSetting);
pipelineDescription.getNumberOfProcessWorkers(),
pluginMetrics);
}

public ServiceMapStatefulProcessor(final long windowDurationMillis,
ServiceMapStatefulProcessor(final long windowDurationMillis,
final File databasePath,
final Clock clock,
final int processWorkers,
final PluginSetting pluginSetting) {
super(pluginSetting);
final PluginMetrics pluginMetrics) {
super(pluginMetrics);

ServiceMapStatefulProcessor.clock = clock;
this.thisProcessorId = processorsCreated.getAndIncrement();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.dataprepper.plugins.processor;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;

import java.util.Random;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.opensearch.dataprepper.plugins.processor.ServiceMapProcessorConfig.DEFAULT_WINDOW_DURATION;

class ServiceMapProcessorConfigTest {
private ServiceMapProcessorConfig serviceMapProcessorConfig;
Random random;

@BeforeEach
void setUp() {
serviceMapProcessorConfig = new ServiceMapProcessorConfig();
random = new Random();
}

@Test
void testDefaultConfig() {
assertThat(serviceMapProcessorConfig.getWindowDuration(), equalTo(DEFAULT_WINDOW_DURATION));
}

@Test
void testGetter() throws NoSuchFieldException, IllegalAccessException {
final int windowDuration = 1 + random.nextInt(300);
ReflectivelySetField.setField(
ServiceMapProcessorConfig.class,
serviceMapProcessorConfig,
"windowDuration",
windowDuration);
assertThat(serviceMapProcessorConfig.getWindowDuration(), equalTo(windowDuration));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.mockito.Mockito;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.trace.Span;
Expand Down Expand Up @@ -43,6 +45,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.processor.ServiceMapProcessorConfig.DEFAULT_WINDOW_DURATION;


public class ServiceMapStatefulProcessorTest {
Expand All @@ -54,12 +57,20 @@ public class ServiceMapStatefulProcessorTest {
private static final String PAYMENT_SERVICE = "PAY";
private static final String CART_SERVICE = "CART";
private PluginSetting pluginSetting;
private PluginMetrics pluginMetrics;
private PipelineDescription pipelineDescription;
private ServiceMapProcessorConfig serviceMapProcessorConfig;

@BeforeEach
public void setup() throws NoSuchFieldException, IllegalAccessException {
resetServiceMapStatefulProcessorStatic();
MetricsTestUtil.initMetrics();
pluginSetting = mock(PluginSetting.class);
pipelineDescription = mock(PipelineDescription.class);
serviceMapProcessorConfig = mock(ServiceMapProcessorConfig.class);
when(serviceMapProcessorConfig.getWindowDuration()).thenReturn(DEFAULT_WINDOW_DURATION);
pluginMetrics = PluginMetrics.fromNames(
"testServiceMapProcessor", "testPipelineName");
when(pluginSetting.getName()).thenReturn("testServiceMapProcessor");
when(pluginSetting.getPipelineName()).thenReturn("testPipelineName");
}
Expand Down Expand Up @@ -116,13 +127,11 @@ private Set<ServiceMapSourceDest> evaluateEdges(Set<ServiceMapRelationship> serv
}

@Test
public void testPluginSettingConstructor() {

final PluginSetting pluginSetting = new PluginSetting("testPluginSetting", Collections.emptyMap());
pluginSetting.setProcessWorkers(4);
pluginSetting.setPipelineName("TestPipeline");
public void testDataPrepperConstructor() {
when(pipelineDescription.getNumberOfProcessWorkers()).thenReturn(4);
//Nothing is accessible to validate, so just verify that no exception is thrown.
final ServiceMapStatefulProcessor serviceMapStatefulProcessor = new ServiceMapStatefulProcessor(pluginSetting);
final ServiceMapStatefulProcessor serviceMapStatefulProcessor = new ServiceMapStatefulProcessor(
serviceMapProcessorConfig, pluginMetrics, pipelineDescription);
}

@Test
Expand All @@ -132,8 +141,8 @@ public void testTraceGroupsWithEventRecordData() throws Exception {
Mockito.when(clock.instant()).thenReturn(Instant.now());
ExecutorService threadpool = Executors.newCachedThreadPool();
final File path = new File(ServiceMapProcessorConfig.DEFAULT_DB_PATH);
final ServiceMapStatefulProcessor serviceMapStateful1 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginSetting);
final ServiceMapStatefulProcessor serviceMapStateful2 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginSetting);
final ServiceMapStatefulProcessor serviceMapStateful1 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginMetrics);
final ServiceMapStatefulProcessor serviceMapStateful2 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginMetrics);

final byte[] rootSpanId1Bytes = ServiceMapTestUtils.getRandomBytes(8);
final byte[] rootSpanId2Bytes = ServiceMapTestUtils.getRandomBytes(8);
Expand Down Expand Up @@ -327,8 +336,8 @@ public void testTraceGroupsWithIsolatedServiceEventRecordData() throws Exception
Mockito.when(clock.instant()).thenReturn(Instant.now());
ExecutorService threadpool = Executors.newCachedThreadPool();
final File path = new File(ServiceMapProcessorConfig.DEFAULT_DB_PATH);
final ServiceMapStatefulProcessor serviceMapStateful1 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginSetting);
final ServiceMapStatefulProcessor serviceMapStateful2 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginSetting);
final ServiceMapStatefulProcessor serviceMapStateful1 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginMetrics);
final ServiceMapStatefulProcessor serviceMapStateful2 = new ServiceMapStatefulProcessor(100, path, clock, 2, pluginMetrics);

final byte[] rootSpanIdBytes = ServiceMapTestUtils.getRandomBytes(8);
final byte[] traceIdBytes = ServiceMapTestUtils.getRandomBytes(16);
Expand Down Expand Up @@ -383,7 +392,7 @@ public void testTraceGroupsWithIsolatedServiceEventRecordData() throws Exception
@Test
public void testPrepareForShutdownWithEventRecordData() {
final File path = new File(ServiceMapProcessorConfig.DEFAULT_DB_PATH);
final ServiceMapStatefulProcessor serviceMapStateful = new ServiceMapStatefulProcessor(100, path, Clock.systemUTC(), 1, pluginSetting);
final ServiceMapStatefulProcessor serviceMapStateful = new ServiceMapStatefulProcessor(100, path, Clock.systemUTC(), 1, pluginMetrics);

final byte[] rootSpanId1Bytes = ServiceMapTestUtils.getRandomBytes(8);
final byte[] traceId1Bytes = ServiceMapTestUtils.getRandomBytes(16);
Expand Down Expand Up @@ -411,11 +420,9 @@ public void testPrepareForShutdownWithEventRecordData() {

@Test
public void testGetIdentificationKeys() {
final PluginSetting pluginSetting = new PluginSetting("testPluginSetting", Collections.emptyMap());
pluginSetting.setProcessWorkers(4);
pluginSetting.setPipelineName("TestPipeline");

final ServiceMapStatefulProcessor serviceMapStatefulProcessor = new ServiceMapStatefulProcessor(pluginSetting);
when(pipelineDescription.getNumberOfProcessWorkers()).thenReturn(4);
final ServiceMapStatefulProcessor serviceMapStatefulProcessor = new ServiceMapStatefulProcessor(
serviceMapProcessorConfig, pluginMetrics, pipelineDescription);
final Collection<String> expectedIdentificationKeys = serviceMapStatefulProcessor.getIdentificationKeys();

assertThat(expectedIdentificationKeys, equalTo(Collections.singleton("traceId")));
Expand Down

0 comments on commit c4455a7

Please sign in to comment.