Skip to content

Commit

Permalink
Lambda sink refactor (opensearch-project#4766)
Browse files Browse the repository at this point in the history
* Lambda sink refactor

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>

* Address comments

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>

---------

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
srikanthjg authored and Krishna Kondaka committed Aug 8, 2024
1 parent 111049d commit 69c839a
Show file tree
Hide file tree
Showing 31 changed files with 87 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# Lambda Sink

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.
Expand All @@ -7,7 +8,7 @@ This plugin enables you to send data from your Data Prepper pipeline directly to
lambda-pipeline:
...
sink:
- lambda:
- aws_lambda:
aws:
region: "us-east-1"
sts_role_arn: "<arn>"
Expand All @@ -31,6 +32,6 @@ The integration tests for this plugin do not run as part of the Data Prepper bui
The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:lambda-sink:integrationTest -Dtests.sink.lambda.region="us-east-1" -Dtests.sink.lambda.functionName="lambda_test_function" -Dtests.sink.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role
./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.sink.lambda.region="us-east-1" -Dtests.sink.lambda.functionName="lambda_test_function" -Dtests.sink.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role
```
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ dependencies {
testAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation testLibs.slf4j.simple
testImplementation 'org.mockito:mockito-core:4.6.1'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.2'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.2'
}

test {
Expand Down Expand Up @@ -59,9 +61,9 @@ task integrationTest(type: Test) {
classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'
systemProperty 'tests.sink.lambda.region', System.getProperty('tests.sink.lambda.region')
systemProperty 'tests.sink.lambda.functionName', System.getProperty('tests.sink.lambda.functionName')
systemProperty 'tests.sink.lambda.sts_role_arn', System.getProperty('tests.sink.lambda.sts_role_arn')
systemProperty 'tests.lambda.sink.region', System.getProperty('tests.lambda.sink.region')
systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName')
systemProperty 'tests.lambda.sink.sts_role_arn', System.getProperty('tests.lambda.sink.sts_role_arn')

filter {
includeTestsMatching '*IT'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda;
package org.opensearch.dataprepper.plugins.lambda.sink;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -35,8 +35,6 @@
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkConfig;
import org.opensearch.dataprepper.plugins.lambda.sink.LambdaSinkService;
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,18 @@ public interface Buffer {
SdkBytes getPayload();

void setEventCount(int eventCount);

//Metrics
public Duration getFlushLambdaSyncLatencyMetric();

public Long getPayloadRequestSyncSize();

public Duration getFlushLambdaAsyncLatencyMetric();

public Long getPayloadResponseSyncSize();

public Long getPayloadRequestAsyncSize();

public Long getPayloadResponseAsyncSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.LambdaException;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -31,7 +32,13 @@ public class InMemoryBuffer implements Buffer {
private final String invocationType;
private int eventCount;
private final StopWatch watch;
private final StopWatch lambdaSyncLatencyWatch;
private final StopWatch lambdaAsyncLatencyWatch;
private boolean isCodecStarted;
private long payloadRequestSyncSize;
private long payloadResponseSyncSize;
private long payloadRequestAsyncSize;
private long payloadResponseAsyncSize;


public InMemoryBuffer(LambdaClient lambdaClient, String functionName, String invocationType) {
Expand All @@ -44,6 +51,12 @@ public InMemoryBuffer(LambdaClient lambdaClient, String functionName, String inv
watch = new StopWatch();
watch.start();
isCodecStarted = false;
lambdaSyncLatencyWatch = new StopWatch();
lambdaAsyncLatencyWatch = new StopWatch();
payloadRequestSyncSize = 0;
payloadResponseSyncSize = 0;
payloadRequestAsyncSize = 0;
payloadResponseAsyncSize =0;
}

@Override
Expand All @@ -65,6 +78,7 @@ public Duration getDuration() {
public void flushToLambdaAsync() {
InvokeResponse resp;
SdkBytes payload = getPayload();
payloadRequestAsyncSize = payload.asByteArray().length;

// Setup an InvokeRequest.
InvokeRequest request = InvokeRequest.builder()
Expand All @@ -73,13 +87,17 @@ public void flushToLambdaAsync() {
.invocationType(invocationType)
.build();

lambdaClient.invoke(request);
lambdaAsyncLatencyWatch.start();
resp = lambdaClient.invoke(request);
lambdaAsyncLatencyWatch.stop();
payloadResponseAsyncSize = resp.payload().asByteArray().length;
}

@Override
public InvokeResponse flushToLambdaSync() {
InvokeResponse resp;
InvokeResponse resp = null;
SdkBytes payload = getPayload();
payloadRequestSyncSize = payload.asByteArray().length;

// Setup an InvokeRequest.
InvokeRequest request = InvokeRequest.builder()
Expand All @@ -88,8 +106,16 @@ public InvokeResponse flushToLambdaSync() {
.invocationType(invocationType)
.build();

resp = lambdaClient.invoke(request);
return resp;
lambdaSyncLatencyWatch.start();
try {
resp = lambdaClient.invoke(request);
payloadResponseSyncSize = resp.payload().asByteArray().length;
lambdaSyncLatencyWatch.stop();
return resp;
} catch (LambdaException e){
lambdaSyncLatencyWatch.stop();
throw new RuntimeException(e);
}
}

private SdkBytes validatePayload(String payload_string) {
Expand Down Expand Up @@ -121,6 +147,30 @@ public SdkBytes getPayload() {
byte[] bytes = byteArrayOutputStream.toByteArray();
SdkBytes sdkBytes = SdkBytes.fromByteArray(bytes);
return sdkBytes;
}
}

public Duration getFlushLambdaSyncLatencyMetric (){
return Duration.ofMillis(lambdaSyncLatencyWatch.getTime(TimeUnit.MILLISECONDS));
}

public Duration getFlushLambdaAsyncLatencyMetric (){
return Duration.ofMillis(lambdaAsyncLatencyWatch.getTime(TimeUnit.MILLISECONDS));
}

public Long getPayloadRequestSyncSize() {
return payloadRequestSyncSize;
}

public Long getPayloadResponseSyncSize() {
return payloadResponseSyncSize;
}

public Long getPayloadRequestAsyncSize() {
return payloadRequestAsyncSize;
}

public Long getPayloadResponseAsyncSize() {
return payloadResponseAsyncSize;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

public class ThresholdOptions {

private static final String DEFAULT_BYTE_CAPACITY = "6mb";
private static final String DEFAULT_BYTE_CAPACITY = "3mb";

@JsonProperty("event_count")
@Size(min = 0, max = 10000000, message = "event_count size should be between 0 and 10000000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import java.util.Collection;

@DataPrepperPlugin(name = "lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class)
@DataPrepperPlugin(name = "aws_lambda", pluginType = Sink.class, pluginConfigurationType = LambdaSinkConfig.class)
public class LambdaSink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(LambdaSink.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.opensearch.dataprepper.model.types.ByteCount;

class ThresholdOptionsTest {
private static final String DEFAULT_BYTE_CAPACITY = "6mb";
private static final String DEFAULT_BYTE_CAPACITY = "3mb";
private static final int DEFAULT_EVENT_COUNT = 0;

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class LambdaSinkTest {

public static final String S3_REGION = "us-east-1";
public static final String CODEC_PLUGIN_NAME = "json";
public static final String SINK_PLUGIN_NAME = "lambda";
public static final String SINK_PLUGIN_NAME = "aws_lambda";
public static final String SINK_PIPELINE_NAME = "lambda-sink-pipeline";
private LambdaSinkConfig lambdaSinkConfig;
private LambdaSink lambdaSink;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# To enable mocking of final classes with vanilla Mockito
# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods
mock-maker-inline
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ include 'data-prepper-plugins:mongodb'
include 'data-prepper-plugins:rds-source'
include 'data-prepper-plugins:http-source-common'
include 'data-prepper-plugins:http-common'
include 'data-prepper-plugins:lambda'
include 'data-prepper-plugins:aws-lambda'

0 comments on commit 69c839a

Please sign in to comment.