Skip to content

Commit

Permalink
Add Lambda Processor Synchronous Mode support
Browse files Browse the repository at this point in the history
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
  • Loading branch information
srikanthjg committed Aug 6, 2024
1 parent 642db0d commit 0cf675b
Show file tree
Hide file tree
Showing 10 changed files with 998 additions and 0 deletions.
35 changes: 35 additions & 0 deletions data-prepper-plugins/aws-lambda/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,39 @@

# Lambda Processor

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.

## Usage
```aidl
lambda-pipeline:
...
processor:
- aws_lambda:
aws:
region: "us-east-1"
sts_role_arn: "<arn>"
function_name: "uploadToS3Lambda"
max_retries: 3
mode: "synchronous"
batch:
batch_key: "osi_key"
threshold:
event_count: 3
maximum_size: 6mb
event_collect_timeout: 15s
```

## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.
The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:aws-lambda:integrationTest -Dtests.processor.lambda.region="us-east-1" -Dtests.processor.lambda.functionName="lambda_test_function" -Dtests.processor.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role
```


# Lambda Sink

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.
Expand Down
4 changes: 4 additions & 0 deletions data-prepper-plugins/aws-lambda/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ task integrationTest(type: Test) {
systemProperty 'tests.lambda.sink.functionName', System.getProperty('tests.lambda.sink.functionName')
systemProperty 'tests.lambda.sink.sts_role_arn', System.getProperty('tests.lambda.sink.sts_role_arn')

systemProperty 'tests.lambda.processor.region', System.getProperty('tests.lambda.processor.region')
systemProperty 'tests.lambda.processor.functionName', System.getProperty('tests.lambda.processor.functionName')
systemProperty 'tests.lambda.processor.sts_role_arn', System.getProperty('tests.lambda.processor.sts_role_arn')

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package org.opensearch.dataprepper.plugins.lambda.processor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import static org.mockito.Mockito.when;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.BatchOptions;
import org.opensearch.dataprepper.plugins.lambda.common.config.ThresholdOptions;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;

@ExtendWith(MockitoExtension.class)

public class LambdaProcessorServiceIT {

private LambdaClient lambdaClient;
private String functionName;
private String lambdaRegion;
private String role;
private BufferFactory bufferFactory;
@Mock
private LambdaProcessorConfig lambdaProcessorConfig;
@Mock
private BatchOptions batchOptions;
@Mock
private ThresholdOptions thresholdOptions;
@Mock
private AwsAuthenticationOptions awsAuthenticationOptions;
@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private PluginFactory pluginFactory;
@Mock
private PluginSetting pluginSetting;
@Mock
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
@Mock
private ExpressionEvaluator expressionEvaluator;

private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


@BeforeEach
public void setUp() throws Exception {
MockitoAnnotations.openMocks(this);
lambdaRegion = System.getProperty("tests.lambda.processor.region");
functionName = System.getProperty("tests.lambda.processor.functionName");
role = System.getProperty("tests.lambda.processor.sts_role_arn");

final Region region = Region.of(lambdaRegion);

lambdaClient = LambdaClient.builder()
.region(Region.of(lambdaRegion))
.build();

bufferFactory = new InMemoryBufferFactory();

when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)).
thenReturn(numberOfRecordsSuccessCounter);
when(pluginMetrics.counter(LambdaProcessor.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).
thenReturn(numberOfRecordsFailedCounter);
}


private static Record<Event> createRecord() {
final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build();
return new Record<>(event);
}

public LambdaProcessor createObjectUnderTest(final String config) throws JsonProcessingException {

final LambdaProcessorConfig lambdaProcessorConfig = objectMapper.readValue(config, LambdaProcessorConfig.class);
return new LambdaProcessor(pluginMetrics,lambdaProcessorConfig,awsCredentialsSupplier,expressionEvaluator);
}

public LambdaProcessor createObjectUnderTest(LambdaProcessorConfig lambdaSinkConfig) throws JsonProcessingException {
return new LambdaProcessor(pluginMetrics,lambdaSinkConfig,awsCredentialsSupplier,expressionEvaluator);
}


private static Collection<Record<Event>> generateRecords(int numberOfRecords) {
List<Record<Event>> recordList = new ArrayList<>();

for (int rows = 1; rows <= numberOfRecords; rows++) {
HashMap<String, String> eventData = new HashMap<>();
eventData.put("name", "Person" + rows);
eventData.put("age", Integer.toString(rows));

Record<Event> eventRecord = new Record<>(JacksonEvent.builder().withData(eventData).withEventType("event").build());
recordList.add(eventRecord);
}
return recordList;
}

@ParameterizedTest
@ValueSource(ints = {1,3})
void verify_records_to_lambda_success(final int recordCount) throws Exception {

when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getMode()).thenReturn("synchronous");

LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig);

Collection<Record<Event>> recordsData = generateRecords(recordCount);
List<Record<Event>> recordsResult = (List<Record<Event>>) objectUnderTest.doExecute(recordsData);
Thread.sleep(Duration.ofSeconds(10).toMillis());

assertEquals(recordsResult.size(),recordCount);
}

@ParameterizedTest
@ValueSource(ints = {1,3})
void verify_records_with_batching_to_lambda(final int recordCount) throws JsonProcessingException, InterruptedException {

when(lambdaProcessorConfig.getFunctionName()).thenReturn(functionName);
when(lambdaProcessorConfig.getMaxConnectionRetries()).thenReturn(3);
when(lambdaProcessorConfig.getMode()).thenReturn("synchronous");
when(thresholdOptions.getEventCount()).thenReturn(1);
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb"));
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s"));
when(batchOptions.getBatchKey()).thenReturn("lambda_batch_key");
when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions);
when(lambdaProcessorConfig.getBatchOptions()).thenReturn(batchOptions);

LambdaProcessor objectUnderTest = createObjectUnderTest(lambdaProcessorConfig);
Collection<Record<Event>> records = generateRecords(recordCount);
Collection<Record<Event>> recordsResult = objectUnderTest.doExecute(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
assertEquals(recordsResult.size(),recordCount);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.lambda.processor;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.lambda.common.config.AwsAuthenticationOptions;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.lambda.LambdaClient;

public final class LambdaClientFactory {
private LambdaClientFactory() {
}

public static LambdaClient createLambdaClient(final LambdaProcessorConfig lambdaProcessorConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(lambdaProcessorConfig.getAwsAuthenticationOptions());
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);

return LambdaClient.builder().region(lambdaProcessorConfig.getAwsAuthenticationOptions().getAwsRegion()).credentialsProvider(awsCredentialsProvider).overrideConfiguration(createOverrideConfiguration(lambdaProcessorConfig)).build();

}

private static ClientOverrideConfiguration createOverrideConfiguration(final LambdaProcessorConfig lambdaProcessorConfig) {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(lambdaProcessorConfig.getMaxConnectionRetries()).build();
return ClientOverrideConfiguration.builder().retryPolicy(retryPolicy).build();
}

private static AwsCredentialsOptions convertToCredentialsOptions(final AwsAuthenticationOptions awsAuthenticationOptions) {
return AwsCredentialsOptions.builder().withRegion(awsAuthenticationOptions.getAwsRegion()).withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()).withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId()).withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()).build();
}
}
Loading

0 comments on commit 0cf675b

Please sign in to comment.