Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lambda Synchronous processor support #4700

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

srikanthjg
Copy link
Contributor

@srikanthjg srikanthjg commented Jul 1, 2024

Description

Adds AWS lambda as a remote processor for dataprepper.
Further details mentioned in #4699

Issues Resolved

Resolves #4699

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
testImplementation 'org.powermock:powermock-api-mockito2:2.0.9'
testImplementation 'junit:junit:4.13.2'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need any of these four lines. They are provided by the root project.

@srikanthjg srikanthjg force-pushed the lambda-processor branch 3 times, most recently from 879671f to 96615fc Compare July 22, 2024 08:00
@srikanthjg srikanthjg force-pushed the lambda-processor branch 2 times, most recently from 67f4a1d to 98b27af Compare August 1, 2024 19:01
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.lambda.LambdaClient;

public final class LambdaClientFactory {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you explore the possibility of using one LambdaClientFactory class? I see one class with that name in lambda sink directory

Copy link
Contributor Author

@srikanthjg srikanthjg Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure i can merge the two and move it to common.

if (mode != null && mode.equalsIgnoreCase(LambdaProcessorConfig.SYNCHRONOUS_MODE)) {
invocationType = SYNC_INVOCATION_TYPE;
} else {
throw new RuntimeException("mode has to be synchronous or asynchronous");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like "unsupported mode {}", mode is better message here.

@kkondaka
Copy link
Collaborator

@srikanthjg white source check is failing. I am ready to approve this.

@JsonProperty("max_retries")
private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES;

@JsonProperty("mode")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The term "mode" is quite ambiguous. I think we can borrow the term "invocation_type" from AWS Lambda itself.

https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html#API_Invoke_RequestSyntax

throw new RuntimeException("Unsupported mode " + mode);
}

codec = new LambdaJsonCodec(batchKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is very restrictive. It assumes that the body fits the format we ask. We should follow the same pattern used elsewhere in Data Prepper by allowing for a pluggable output codec.

Copy link
Contributor Author

@srikanthjg srikanthjg Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LambdaJsonCodec is an implementation of OutputCodec - link . I needed this specifically for batch processing when more than one event needs to be mapped to json. JsonOutputCodec is only event at a time. Maybe i can change the name to something more generic like BulkJsonOutputCodec or BatchJsonOutputCodec? It will be used for all dial out processors. In s3 sink we use BufferedCodec, but the only implementation is Parquet currently and the way we want to implement for lambda is different.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All OutputCodecs are made for batches.

Customers can use either JsonOutputCodec or NdjsonOutputCodec.

The json codec already supports a configurable key name. This would replace the batch_key which you don't need.

codec:
  json:
    key_name: myKey

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the customer can have two options:

  1. JSON
codec:
  json:
    key_name: myKey

Yields:

{
  "myKey" : [
     { ...event1... },
     { ...event2... },
     { ...event3... }
  ]
}
  1. The customer can use ndjson
codec:
  ndjson:

Yields:

{ ...event1... }
{ ...event2... }
{ ...event3... }

Copy link
Contributor Author

@srikanthjg srikanthjg Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bulk will always need a key as it will be considered one payload, so i guess ndJson cannot be used.

There is also a difference when it comes to handling single event without batch. In this case, i still want to convert dataprepper event to json but i dont want to have a key, i want to pass on the user's data as it is to lambda as payload; but current output codec forces me to have a key. To address that, i either need to add new behaviour to json writeEvent method, to convert event directly to json OR write a new codec(which is what i did).

The behaviour i want seem to be a combination of the 2 codecs - ndjson and json. i want json behaviour for bulk and ndjson behaviour when for single event.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srikanthjg ,

Bulk will always need a key as it will be considered one payload, so i guess ndJson cannot be used.

Yes, this makes sense. The payload needs to be JSON and ND-JSON with multiple events becomes non-JSON.

There is also a difference when it comes to handling single event without batch.

Actually, an ndjson which writes a single event gives you exactly what you want in this case. It is exactly the same output.

I also see that you are trying to support the concept of calling a Lambda for each event. Improving the configuration can help make this clearer. Right now there are multiple configurations which the user needs to carefully set to get the desired output.

This is a simpler way to configure it.

  1. To have a single invocation per event, add a boolean flag. The user need not make any more decisions.
aws_lambda:
  function_name: MyFunction
  invocation_per_event: true
  1. The default should be to batch, and this can have the existing defaults. You can probably keep this configuration the same. Though, rename batch_key to key_name for consistency with the other APIs. Also, disallow setting an event size of 1 as this is not the goal of this approach.
aws_lambda:
  function:name: MyFunction

Second, you can still use the existing codecs.

When invocation_per_event is set to true, you can use the NdJsonInputCodec internally. Otherwise, use the JsonCodec and provide the batch.key_name as the keyName in the codec configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding the configuration, i already have "invocation_type" as a configuration, this allows to set per event invocation or batch invocation(RequestResponse or Event).

If we are implementing this internally, i cannot use parse-json-processor as a plugin but will have to take a dependency on it. Is it ok for one processor to take a dependency on the other?

I wanted to avoid this, hence went with implementing a custom codec. But i think this codec can also be used by other dial-out processors eventually, i can make it generic.


public class LambdaProcessorConfig {

public static final String SYNCHRONOUS_MODE = "RequestResponse";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should stick with Data Prepper naming conventions: request_response.

public class LambdaProcessorConfig {

public static final String SYNCHRONOUS_MODE = "RequestResponse";
public static final String ASYNCHRONOUS_MODE = "Event";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to event.


public class LambdaProcessorConfig {

public static final String REQUEST_RESPONSE = "RequestResponse";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use request-response to match our existing naming conventions.

public class LambdaProcessorConfig {

public static final String REQUEST_RESPONSE = "RequestResponse";
public static final String EVENT = "Event";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use event to match our existing naming conventions.

@@ -18,10 +18,12 @@

public class LambdaSinkConfig {

public static final String REQUEST_RESPONSE = "RequestResponse";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consolidate these constant values with the LambdaProcessorConfig so that they don't diverge.

public static final String EVENT = "Event";
public static final String BATCH_EVENT = "batch_event";
public static final String SINGLE_EVENT = "single_event";
public static final String REQUEST_RESPONSE = "request-response";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps make a CommonLambdaConfig class that has these constants. We should avoid duplicating these or we may have future mismatches.

dlvenable
dlvenable previously approved these changes Sep 13, 2024
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @srikanthjg for this contribution!

dlvenable
dlvenable previously approved these changes Sep 16, 2024
maximum_size: 3mb
```

`invocation_type` as RequestResponse will be used when the response from aws lambda comes back to dataprepper.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
invocation_type as RequestResponse is used when DataPrepper needs to process the response from AWS Lambda.

invocation_type as Event is used when the response from AWS Lambda is sent to an S3 bucket.


In batch options, an implicit batch threshold option is that if events size is 3mb, we flush it.
`payload_model` this is used to define how the payload should be constructed from a dataprepper event.
`payload_model` as batch_event is used when the output needs to be formed as a batch of multiple events,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there other values for paylod_model ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


@ParameterizedTest
@ValueSource(ints = {1,3})
void verify_records_to_lambda_success(final int recordCount) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding test for InvocationType Event

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invocation type event will be disabled for now, will be releasing event type with asynchronous support that requires additional infra changes. I have disabled it in the verification for now, ll fix the readme.

final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);

return LambdaClient.builder()
.region(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion())
.region(awsAuthenticationOptions.getAwsRegion())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider enabling SDK metrics to track number of request, timeout, throttle etc.

@JsonProperty("batch_key")
private String batchKey = DEFAULT_BATCH_KEY;
@JsonProperty("key_name")
@Size(min = 1, max = 2048)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is max value on the key_name consistent across data prepper for other process ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

codec = new NdjsonOutputCodec(ndjsonOutputCodecConfig);
isBatchEnabled = false;
} else{
throw new RuntimeException("invalid payload_model option");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this validation be part of lambdaProcessorConfig ?


if(!lambdaProcessorConfig.getInvocationType().equals(LambdaCommonConfig.EVENT) &&
!lambdaProcessorConfig.getInvocationType().equals(LambdaCommonConfig.REQUEST_RESPONSE)){
throw new RuntimeException("Unsupported invocation type " + lambdaProcessorConfig.getInvocationType());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

if (currentBuffer.getEventCount() == 0) {
codec.start(currentBuffer.getOutputStream(), event, codecContext);
}
codec.writeEvent(event, currentBuffer.getOutputStream());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is currentBuffer thread safe ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is running in the context of a processor.


}

void flushToLambdaIfNeeded(List<Record<Event>> resultRecords) throws InterruptedException, IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be private function


void flushToLambdaIfNeeded(List<Record<Event>> resultRecords) throws InterruptedException, IOException {

LOG.info("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there excessive logging in this method ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will reduce them.

}
}

LambdaResult retryFlushToLambda(Buffer currentBuffer, final AtomicReference<String> errorMsgObj) throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be private function

return lambdaResult;
}

Event convertLambdaResponseToEvent(InvokeResponse lambdaResponse) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be private function

Comment on lines 120 to 122
Map<String, String> invocationTypeMap = Map.of(
LambdaCommonConfig.EVENT, EVENT_LAMBDA
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to static constant

Comment on lines +151 to +146
this.bufferFactory = new InMemoryBufferFactory();
try {
currentBuffer = this.bufferFactory.getBuffer(lambdaClient, functionName, invocationType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer is overloaded in DataPrepper. Looks like this is not just buffer but tightly coupled with lambda. We should consider renaming this class and interface to be clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am handling the same way we would do it in the sink. i can address refactor in another pr.

} catch (AwsServiceException | SdkClientException e) {
errorMsgObj.set(e.getMessage());
LOG.error("Exception occurred while uploading records to lambda. Retry countdown : {} | exception:", retryCount, e);
--retryCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this retry on top of lambda client retry ? Any reason we need this ?

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AWS Lambda as Synchronous Processor
4 participants