Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda-event-sources): msk and self-managed kafka event sources #12507

Merged
merged 59 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
40bfee4
Add SourceAccessConfigurations to event-source-mapping.ts
bracki Jan 14, 2021
ad0f9a6
Implement Kafka event sources
bracki Jan 14, 2021
1f510d6
Add test for ManagedKafkaEventSource
bracki Jan 14, 2021
af073f9
Enhance test
bracki Jan 14, 2021
410ac08
Add test for SelfManagedKafkaEventSource
bracki Jan 14, 2021
2d6bccc
Add section to README.md
bracki Jan 14, 2021
f742a93
Add docstrings
bracki Jan 15, 2021
3dd9628
Apply suggestions from code review for README
bracki Jan 18, 2021
a896cf7
Remove obsolete eslint rule
bracki Jan 18, 2021
f94133c
Remove obsolete eslint rule - and fix it
bracki Jan 18, 2021
c967923
Document default value
bracki Jan 18, 2021
4ee85c8
Rename to kafkaSecret and change type
bracki Jan 18, 2021
f372c2f
Revert "Rename to kafkaSecret and change type"
bracki Jan 18, 2021
ac6a393
Refactor to use props
bracki Jan 18, 2021
08a0ba1
Remove doc error whitelisting
bracki Jan 18, 2021
979414f
It needs to be SASL_SCRAM_512_AUTH
bracki Jan 19, 2021
2e06917
Change EventSourceMappingOptions
bracki Jan 19, 2021
1674f48
Add support for self managed Kafka running in VPC
bracki Jan 19, 2021
036174b
Add test and error handling for VPC stuff
bracki Jan 19, 2021
f651636
Fix error
bracki Jan 19, 2021
315d56f
Unique identifier
bracki Jan 19, 2021
3820968
Fix lambda error and rename
bracki Jan 19, 2021
9199f49
Flatten to kafkaBootstrapServers
bracki Feb 11, 2021
265f98f
Introduce SourceAccessConfigurationType
bracki Feb 11, 2021
dc8ec24
Make authenticationMethod configurable
bracki Feb 11, 2021
eb72439
Take care of clusterArn being a token
bracki Feb 11, 2021
eba7e2f
Docs
bracki Feb 11, 2021
4278cb9
Refactor to take vpc and subnet selection
bracki Feb 11, 2021
a549a6b
Get rid of generics
bracki Feb 12, 2021
3cb7bb6
Update packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
bracki Feb 16, 2021
80fd8d5
Update packages/@aws-cdk/aws-lambda-event-sources/test/test.kafka.ts
bracki Feb 16, 2021
407190d
Update packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
bracki Feb 16, 2021
91064e7
Update packages/@aws-cdk/aws-lambda-event-sources/README.md
bracki Feb 16, 2021
fb47c96
Update packages/@aws-cdk/aws-lambda-event-sources/README.md
bracki Feb 16, 2021
ba3ebde
Fix indentation
bracki Feb 16, 2021
1f47f77
Add static method to create custom variants
bracki Feb 16, 2021
859c9d9
Introduce AuthenticationMethod enum
bracki Feb 16, 2021
b7d6b1a
Introduce ICluster/Cluster in aws-msk
bracki Feb 16, 2021
545270a
Fix error handling when VPC is provided
bracki Feb 16, 2021
5719072
Make use of Cluster from aws-msk
bracki Feb 16, 2021
7f5852f
Update packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
bracki Mar 4, 2021
f44c398
Update packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
bracki Mar 4, 2021
fd318c8
Remove verbose docs
bracki Mar 4, 2021
439e0c6
Update packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
bracki Mar 4, 2021
c0d6006
Fix typo
bracki Mar 4, 2021
4a18ce7
Apply feedback
bracki Mar 4, 2021
b2c9145
Extract method
bracki Mar 4, 2021
87dcaec
Update packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
bracki Mar 4, 2021
8d5d938
Reorganize
bracki Mar 4, 2021
b6be30a
rename tests
bracki Mar 4, 2021
14c0876
Fix and extract
bracki Mar 4, 2021
838d43f
Add more tests
bracki Mar 4, 2021
a177e0d
Remove obsolete dependency
bracki Mar 4, 2021
6f451cf
Apply feedback
bracki Mar 16, 2021
be2b531
Rearrange tests
bracki Mar 16, 2021
2bae12c
Merge branch 'master' into kafka-lambda-event-source
mergify[bot] Mar 16, 2021
3054940
Merge branch 'master' into kafka-lambda-event-source
mergify[bot] Mar 16, 2021
5793e65
Merge branch 'master' into kafka-lambda-event-source
mergify[bot] Mar 16, 2021
5506cc1
Merge branch 'master' into kafka-lambda-event-source
mergify[bot] Mar 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,67 @@ myFunction.addEventSource(new KinesisEventSource(stream, {
}));
```

## Kafka

You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster.

The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the
MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html).

```ts
import * as lambda from '@aws-cdk/aws-lambda';
import * as msk from '@aws-cdk/aws-lambda';
import { Secret } from '@aws-cdk/aws-secretmanager';
import { ManagedKafkaEventSource } from '@aws-cdk/aws-lambda-event-sources';

// Your MSK cluster
const cluster = msk.Cluster.fromClusterArn(this, 'Cluster',
'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4');

// The Kafka topic you want to subscribe to
const topic = 'some-cool-topic'

// The secret that allows access to your MSK cluster
// You still have to make sure that it is associated with your cluster as described in the documentation
const secret = new Secret(this, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });

myFunction.addEventSource(new ManagedKafkaEventSource({
cluster: cluster,
topic: topic,
secret: secret,
batchSize: 100, // default
startingPosition: lambda.StartingPosition.TRIM_HORIZON
}));
```

The following code sets up a self managed Kafka cluster as an event source. Username and password based authentication
will need to be set up as described in [Managing access and permissions](https://docs.aws.amazon.com/lambda/latest/dg/smaa-permissions.html#smaa-permissions-add-secret).

```ts
import * as lambda from '@aws-cdk/aws-lambda';
import { Secret } from '@aws-cdk/aws-secretmanager';
import { SelfManagedKafkaEventSource } from '@aws-cdk/aws-lambda-event-sources';

// The list of Kafka brokers
const bootstrapServers = ['kafka-broker:9092']

// The Kafka topic you want to subscribe to
const topic = 'some-cool-topic'

// The secret that allows access to your self hosted Kafka cluster
const secret = new Secret(this, 'Secret', { ... });

myFunction.addEventSource(new SelfManagedKafkaEventSource({
bootstrapServers: bootstrapServers,
topic: topic,
secret: secret,
batchSize: 100, // default
startingPosition: lambda.StartingPosition.TRIM_HORIZON
}));
```

If your self managed Kafka cluster is only reachable via VPC also configure `vpc` `vpcSubnets` and `securityGroup`.

## Roadmap

Eventually, this module will support all the event sources described under
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './api';
export * from './dynamodb';
export * from './kafka';
export * from './kinesis';
export * from './s3';
export * from './sns';
Expand Down
194 changes: 194 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import * as crypto from 'crypto';
import { ISecurityGroup, IVpc, SubnetSelection } from '@aws-cdk/aws-ec2';
import * as iam from '@aws-cdk/aws-iam';
import * as lambda from '@aws-cdk/aws-lambda';
import * as msk from '@aws-cdk/aws-msk';
import * as secretsmanager from '@aws-cdk/aws-secretsmanager';
import { Stack } from '@aws-cdk/core';
import { StreamEventSource, StreamEventSourceProps } from './stream';

// keep this import separate from other imports to reduce chance for merge conflicts with v2-main
// eslint-disable-next-line no-duplicate-imports, import/order
import { Construct } from '@aws-cdk/core';

/**
* Properties for a Kafka event source
*/
export interface KafkaEventSourceProps extends StreamEventSourceProps {
/**
* the Kafka topic to subscribe to
*/
readonly topic: string,
/**
* the secret with the Kafka credentials, see https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html for details
*/
readonly secret: secretsmanager.ISecret
}

/**
* Properties for a MSK event source
*/
export interface ManagedKafkaEventSourceProps extends KafkaEventSourceProps {
/**
* an MSK cluster construct
*/
readonly cluster: msk.ICluster
}

/**
* The authentication method to use with SelfManagedKafkaEventSource
*/
export enum AuthenticationMethod {
/**
* SASL_SCRAM_512_AUTH authentication method for your Kafka cluster
*/
SASL_SCRAM_512_AUTH = 'SASL_SCRAM_512_AUTH',
/**
* SASL_SCRAM_256_AUTH authentication method for your Kafka cluster
*/
SASL_SCRAM_256_AUTH = 'SASL_SCRAM_256_AUTH',
}

/**
* Properties for a self managed Kafka cluster event source.
* If your Kafka cluster is only reachable via VPC make sure to configure it.
*/
export interface SelfManagedKafkaEventSourceProps extends KafkaEventSourceProps {
/**
* The list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that
* a Kafka client connects to initially to bootstrap itself. They are in the format `abc.xyz.com:xxxx`.
*/
readonly bootstrapServers: string[]

/**
* If your Kafka brokers are only reachable via VPC provide the VPC here
*
* @default none
*/
readonly vpc?: IVpc;

/**
* If your Kafka brokers are only reachable via VPC, provide the subnets selection here
*
* @default - none, required if setting vpc
*/
readonly vpcSubnets?: SubnetSelection,

/**
* If your Kafka brokers are only reachable via VPC, provide the security group here
*
* @default - none, required if setting vpc
*/
readonly securityGroup?: ISecurityGroup

/**
* The authentication method for your Kafka cluster
*
* @default AuthenticationMethod.SASL_SCRAM_512_AUTH
*/
readonly authenticationMethod?: AuthenticationMethod
}

/**
* Use a MSK cluster as a streaming source for AWS Lambda
*/
export class ManagedKafkaEventSource extends StreamEventSource {
// This is to work around JSII inheritance problems
private innerProps: ManagedKafkaEventSourceProps;

constructor(props: ManagedKafkaEventSourceProps) {
nija-at marked this conversation as resolved.
Show resolved Hide resolved
super(props);
this.innerProps = props;
}

public bind(target: lambda.IFunction) {
target.addEventSourceMapping(
`KafkaEventSource:${this.innerProps.cluster.clusterArn}${this.innerProps.topic}`,
this.enrichMappingOptions({
eventSourceArn: this.innerProps.cluster.clusterArn,
startingPosition: this.innerProps.startingPosition,
// From https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-limitations, "Amazon MSK only supports SCRAM-SHA-512 authentication."
sourceAccessConfigurations: [{ type: lambda.SourceAccessConfigurationType.SASL_SCRAM_512_AUTH, uri: this.innerProps.secret.secretArn }],
bracki marked this conversation as resolved.
Show resolved Hide resolved
kafkaTopic: this.innerProps.topic,
}),
);

this.innerProps.secret.grantRead(target);

target.addToRolePolicy(new iam.PolicyStatement(
{
actions: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers', 'kafka:ListScramSecrets'],
resources: [this.innerProps.cluster.clusterArn],
},
));

target.role?.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaMSKExecutionRole'));
}
}

/**
* Use a self hosted Kafka installation as a streaming source for AWS Lambda.
*/
export class SelfManagedKafkaEventSource extends StreamEventSource {
// This is to work around JSII inheritance problems
private innerProps: SelfManagedKafkaEventSourceProps;

constructor(props: SelfManagedKafkaEventSourceProps) {
nija-at marked this conversation as resolved.
Show resolved Hide resolved
super(props);
if (props.vpc) {
if (!props.securityGroup) {
throw new Error('securityGroup must be set when providing vpc');
}
if (!props.vpcSubnets) {
throw new Error('vpcSubnets must be set when providing vpc');
}
}
this.innerProps = props;
}

public bind(target: lambda.IFunction) {
if (!Construct.isConstruct(target)) { throw new Error('Function is not a construct. Unexpected error.'); }
target.addEventSourceMapping(
this.mappingId(target),
this.enrichMappingOptions({
kafkaBootstrapServers: this.innerProps.bootstrapServers,
kafkaTopic: this.innerProps.topic,
startingPosition: this.innerProps.startingPosition,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
}),
);
this.innerProps.secret.grantRead(target);
}

private mappingId(target: lambda.IFunction) {
let hash = crypto.createHash('md5');
hash.update(JSON.stringify(Stack.of(target).resolve(this.innerProps.bootstrapServers)));
const idHash = hash.digest('hex');
return `KafkaEventSource:${idHash}:${this.innerProps.topic}`;
}

private sourceAccessConfigurations() {
let authType;
switch (this.innerProps.authenticationMethod) {
case AuthenticationMethod.SASL_SCRAM_256_AUTH:
authType = lambda.SourceAccessConfigurationType.SASL_SCRAM_256_AUTH;
break;
case AuthenticationMethod.SASL_SCRAM_512_AUTH:
default:
authType = lambda.SourceAccessConfigurationType.SASL_SCRAM_512_AUTH;
break;
}
let sourceAccessConfigurations = [{ type: authType, uri: this.innerProps.secret.secretArn }];
if (this.innerProps.vpcSubnets !== undefined && this.innerProps.securityGroup !== undefined) {
sourceAccessConfigurations.push({
type: lambda.SourceAccessConfigurationType.VPC_SECURITY_GROUP,
uri: this.innerProps.securityGroup.securityGroupId,
},
);
this.innerProps.vpc?.selectSubnets(this.innerProps.vpcSubnets).subnetIds.forEach((id) => {
sourceAccessConfigurations.push({ type: lambda.SourceAccessConfigurationType.VPC_SUBNET, uri: id });
});
}
return sourceAccessConfigurations;
}
}
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Duration } from '@aws-cdk/core';

/**
* The set of properties for event sources that follow the streaming model,
* such as, Dynamo and Kinesis.
* such as, Dynamo, Kinesis and Kafka.
*/
export interface StreamEventSourceProps {
/**
Expand Down
6 changes: 6 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,15 @@
"dependencies": {
"@aws-cdk/aws-apigateway": "0.0.0",
"@aws-cdk/aws-dynamodb": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-events": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-msk": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-s3-notifications": "0.0.0",
"@aws-cdk/aws-secretsmanager": "0.0.0",
"@aws-cdk/aws-sns": "0.0.0",
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
Expand All @@ -88,12 +91,15 @@
"peerDependencies": {
"@aws-cdk/aws-apigateway": "0.0.0",
"@aws-cdk/aws-dynamodb": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-events": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-msk": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-s3-notifications": "0.0.0",
"@aws-cdk/aws-secretsmanager": "0.0.0",
"@aws-cdk/aws-sns": "0.0.0",
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
Expand Down
Loading