diff --git a/packages/@aws-cdk/aws-lambda-event-sources/README.md b/packages/@aws-cdk/aws-lambda-event-sources/README.md index 5e1578d9aefd4..eb4f206c8f3c9 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/README.md +++ b/packages/@aws-cdk/aws-lambda-event-sources/README.md @@ -207,6 +207,67 @@ myFunction.addEventSource(new KinesisEventSource(stream, { })); ``` +## Kafka + +You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster. + +The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the +MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html). + +```ts +import * as lambda from '@aws-cdk/aws-lambda'; +import * as msk from '@aws-cdk/aws-lambda'; +import { Secret } from '@aws-cdk/aws-secretmanager'; +import { ManagedKafkaEventSource } from '@aws-cdk/aws-lambda-event-sources'; + +// Your MSK cluster +const cluster = msk.Cluster.fromClusterArn(this, 'Cluster', + 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4'); + +// The Kafka topic you want to subscribe to +const topic = 'some-cool-topic' + +// The secret that allows access to your MSK cluster +// You still have to make sure that it is associated with your cluster as described in the documentation +const secret = new Secret(this, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + +myFunction.addEventSource(new ManagedKafkaEventSource({ + cluster: cluster, + topic: topic, + secret: secret, + batchSize: 100, // default + startingPosition: lambda.StartingPosition.TRIM_HORIZON +})); +``` + +The following code sets up a self managed Kafka cluster as an event source. Username and password based authentication +will need to be set up as described in [Managing access and permissions](https://docs.aws.amazon.com/lambda/latest/dg/smaa-permissions.html#smaa-permissions-add-secret). + +```ts +import * as lambda from '@aws-cdk/aws-lambda'; +import { Secret } from '@aws-cdk/aws-secretmanager'; +import { SelfManagedKafkaEventSource } from '@aws-cdk/aws-lambda-event-sources'; + +// The list of Kafka brokers +const bootstrapServers = ['kafka-broker:9092'] + +// The Kafka topic you want to subscribe to +const topic = 'some-cool-topic' + +// The secret that allows access to your self hosted Kafka cluster +const secret = new Secret(this, 'Secret', { ... }); + +myFunction.addEventSource(new SelfManagedKafkaEventSource({ + bootstrapServers: bootstrapServers, + topic: topic, + secret: secret, + batchSize: 100, // default + startingPosition: lambda.StartingPosition.TRIM_HORIZON +})); +``` + +If your self managed Kafka cluster is only reachable via VPC also configure `vpc` `vpcSubnets` and `securityGroup`. + ## Roadmap Eventually, this module will support all the event sources described under diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts index 19253a743cae8..555137e51afbf 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts @@ -1,5 +1,6 @@ export * from './api'; export * from './dynamodb'; +export * from './kafka'; export * from './kinesis'; export * from './s3'; export * from './sns'; diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts new file mode 100644 index 0000000000000..f0782964d93ce --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts @@ -0,0 +1,194 @@ +import * as crypto from 'crypto'; +import { ISecurityGroup, IVpc, SubnetSelection } from '@aws-cdk/aws-ec2'; +import * as iam from '@aws-cdk/aws-iam'; +import * as lambda from '@aws-cdk/aws-lambda'; +import * as msk from '@aws-cdk/aws-msk'; +import * as secretsmanager from '@aws-cdk/aws-secretsmanager'; +import { Stack } from '@aws-cdk/core'; +import { StreamEventSource, StreamEventSourceProps } from './stream'; + +// keep this import separate from other imports to reduce chance for merge conflicts with v2-main +// eslint-disable-next-line no-duplicate-imports, import/order +import { Construct } from '@aws-cdk/core'; + +/** + * Properties for a Kafka event source + */ +export interface KafkaEventSourceProps extends StreamEventSourceProps { + /** + * the Kafka topic to subscribe to + */ + readonly topic: string, + /** + * the secret with the Kafka credentials, see https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html for details + */ + readonly secret: secretsmanager.ISecret +} + +/** + * Properties for a MSK event source + */ +export interface ManagedKafkaEventSourceProps extends KafkaEventSourceProps { + /** + * an MSK cluster construct + */ + readonly cluster: msk.ICluster +} + +/** + * The authentication method to use with SelfManagedKafkaEventSource + */ +export enum AuthenticationMethod { + /** + * SASL_SCRAM_512_AUTH authentication method for your Kafka cluster + */ + SASL_SCRAM_512_AUTH = 'SASL_SCRAM_512_AUTH', + /** + * SASL_SCRAM_256_AUTH authentication method for your Kafka cluster + */ + SASL_SCRAM_256_AUTH = 'SASL_SCRAM_256_AUTH', +} + +/** + * Properties for a self managed Kafka cluster event source. + * If your Kafka cluster is only reachable via VPC make sure to configure it. + */ +export interface SelfManagedKafkaEventSourceProps extends KafkaEventSourceProps { + /** + * The list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that + * a Kafka client connects to initially to bootstrap itself. They are in the format `abc.xyz.com:xxxx`. + */ + readonly bootstrapServers: string[] + + /** + * If your Kafka brokers are only reachable via VPC provide the VPC here + * + * @default none + */ + readonly vpc?: IVpc; + + /** + * If your Kafka brokers are only reachable via VPC, provide the subnets selection here + * + * @default - none, required if setting vpc + */ + readonly vpcSubnets?: SubnetSelection, + + /** + * If your Kafka brokers are only reachable via VPC, provide the security group here + * + * @default - none, required if setting vpc + */ + readonly securityGroup?: ISecurityGroup + + /** + * The authentication method for your Kafka cluster + * + * @default AuthenticationMethod.SASL_SCRAM_512_AUTH + */ + readonly authenticationMethod?: AuthenticationMethod +} + +/** + * Use a MSK cluster as a streaming source for AWS Lambda + */ +export class ManagedKafkaEventSource extends StreamEventSource { + // This is to work around JSII inheritance problems + private innerProps: ManagedKafkaEventSourceProps; + + constructor(props: ManagedKafkaEventSourceProps) { + super(props); + this.innerProps = props; + } + + public bind(target: lambda.IFunction) { + target.addEventSourceMapping( + `KafkaEventSource:${this.innerProps.cluster.clusterArn}${this.innerProps.topic}`, + this.enrichMappingOptions({ + eventSourceArn: this.innerProps.cluster.clusterArn, + startingPosition: this.innerProps.startingPosition, + // From https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-limitations, "Amazon MSK only supports SCRAM-SHA-512 authentication." + sourceAccessConfigurations: [{ type: lambda.SourceAccessConfigurationType.SASL_SCRAM_512_AUTH, uri: this.innerProps.secret.secretArn }], + kafkaTopic: this.innerProps.topic, + }), + ); + + this.innerProps.secret.grantRead(target); + + target.addToRolePolicy(new iam.PolicyStatement( + { + actions: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers', 'kafka:ListScramSecrets'], + resources: [this.innerProps.cluster.clusterArn], + }, + )); + + target.role?.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaMSKExecutionRole')); + } +} + +/** + * Use a self hosted Kafka installation as a streaming source for AWS Lambda. + */ +export class SelfManagedKafkaEventSource extends StreamEventSource { + // This is to work around JSII inheritance problems + private innerProps: SelfManagedKafkaEventSourceProps; + + constructor(props: SelfManagedKafkaEventSourceProps) { + super(props); + if (props.vpc) { + if (!props.securityGroup) { + throw new Error('securityGroup must be set when providing vpc'); + } + if (!props.vpcSubnets) { + throw new Error('vpcSubnets must be set when providing vpc'); + } + } + this.innerProps = props; + } + + public bind(target: lambda.IFunction) { + if (!Construct.isConstruct(target)) { throw new Error('Function is not a construct. Unexpected error.'); } + target.addEventSourceMapping( + this.mappingId(target), + this.enrichMappingOptions({ + kafkaBootstrapServers: this.innerProps.bootstrapServers, + kafkaTopic: this.innerProps.topic, + startingPosition: this.innerProps.startingPosition, + sourceAccessConfigurations: this.sourceAccessConfigurations(), + }), + ); + this.innerProps.secret.grantRead(target); + } + + private mappingId(target: lambda.IFunction) { + let hash = crypto.createHash('md5'); + hash.update(JSON.stringify(Stack.of(target).resolve(this.innerProps.bootstrapServers))); + const idHash = hash.digest('hex'); + return `KafkaEventSource:${idHash}:${this.innerProps.topic}`; + } + + private sourceAccessConfigurations() { + let authType; + switch (this.innerProps.authenticationMethod) { + case AuthenticationMethod.SASL_SCRAM_256_AUTH: + authType = lambda.SourceAccessConfigurationType.SASL_SCRAM_256_AUTH; + break; + case AuthenticationMethod.SASL_SCRAM_512_AUTH: + default: + authType = lambda.SourceAccessConfigurationType.SASL_SCRAM_512_AUTH; + break; + } + let sourceAccessConfigurations = [{ type: authType, uri: this.innerProps.secret.secretArn }]; + if (this.innerProps.vpcSubnets !== undefined && this.innerProps.securityGroup !== undefined) { + sourceAccessConfigurations.push({ + type: lambda.SourceAccessConfigurationType.VPC_SECURITY_GROUP, + uri: this.innerProps.securityGroup.securityGroupId, + }, + ); + this.innerProps.vpc?.selectSubnets(this.innerProps.vpcSubnets).subnetIds.forEach((id) => { + sourceAccessConfigurations.push({ type: lambda.SourceAccessConfigurationType.VPC_SUBNET, uri: id }); + }); + } + return sourceAccessConfigurations; + } +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts index d18eaaf3f947c..96907b97835fc 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts @@ -3,7 +3,7 @@ import { Duration } from '@aws-cdk/core'; /** * The set of properties for event sources that follow the streaming model, - * such as, Dynamo and Kinesis. + * such as, Dynamo, Kinesis and Kafka. */ export interface StreamEventSourceProps { /** diff --git a/packages/@aws-cdk/aws-lambda-event-sources/package.json b/packages/@aws-cdk/aws-lambda-event-sources/package.json index 20112e1029cbf..33906c1f5219e 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/package.json +++ b/packages/@aws-cdk/aws-lambda-event-sources/package.json @@ -72,12 +72,15 @@ "dependencies": { "@aws-cdk/aws-apigateway": "0.0.0", "@aws-cdk/aws-dynamodb": "0.0.0", + "@aws-cdk/aws-ec2": "0.0.0", "@aws-cdk/aws-events": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", + "@aws-cdk/aws-msk": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", "@aws-cdk/aws-s3-notifications": "0.0.0", + "@aws-cdk/aws-secretsmanager": "0.0.0", "@aws-cdk/aws-sns": "0.0.0", "@aws-cdk/aws-sns-subscriptions": "0.0.0", "@aws-cdk/aws-sqs": "0.0.0", @@ -88,12 +91,15 @@ "peerDependencies": { "@aws-cdk/aws-apigateway": "0.0.0", "@aws-cdk/aws-dynamodb": "0.0.0", + "@aws-cdk/aws-ec2": "0.0.0", "@aws-cdk/aws-events": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", + "@aws-cdk/aws-msk": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", "@aws-cdk/aws-s3-notifications": "0.0.0", + "@aws-cdk/aws-secretsmanager": "0.0.0", "@aws-cdk/aws-sns": "0.0.0", "@aws-cdk/aws-sns-subscriptions": "0.0.0", "@aws-cdk/aws-sqs": "0.0.0", diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/test.kafka.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/test.kafka.ts new file mode 100644 index 0000000000000..9cf1af9e50507 --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/test.kafka.ts @@ -0,0 +1,334 @@ +import { arrayWith, expect, haveResource } from '@aws-cdk/assert'; +import { SecurityGroup, SubnetType, Vpc } from '@aws-cdk/aws-ec2'; +import * as lambda from '@aws-cdk/aws-lambda'; +import * as msk from '@aws-cdk/aws-msk'; +import { Secret } from '@aws-cdk/aws-secretsmanager'; +import * as cdk from '@aws-cdk/core'; +import { Test } from 'nodeunit'; +import * as sources from '../lib'; +import { TestFunction } from './test-function'; + +export = { + 'msk': { + 'default'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const cluster = msk.Cluster.fromClusterArn(stack, 'Cluster', 'some-arn'); + const kafkaTopic = 'some-topic'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + + // WHEN + fn.addEventSource(new sources.ManagedKafkaEventSource( + { + cluster: cluster, + topic: kafkaTopic, + secret: secret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + })); + + // THEN + expect(stack).to(haveResource('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 'secretsmanager:GetSecretValue', + 'secretsmanager:DescribeSecret', + ], + Effect: 'Allow', + Resource: { + Ref: 'SecretA720EF05', + }, + }, + { + Action: [ + 'kafka:DescribeCluster', + 'kafka:GetBootstrapBrokers', + 'kafka:ListScramSecrets', + ], + Effect: 'Allow', + Resource: cluster.clusterArn, + }, + ], + Version: '2012-10-17', + }, + PolicyName: 'FnServiceRoleDefaultPolicyC6A839BF', + Roles: [ + { + Ref: 'FnServiceRoleB9001A96', + }, + ], + })); + + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + EventSourceArn: cluster.clusterArn, + FunctionName: { + Ref: 'Fn9270CBC0', + }, + BatchSize: 100, + StartingPosition: 'TRIM_HORIZON', + Topics: [ + kafkaTopic, + ], + SourceAccessConfigurations: [ + { + Type: 'SASL_SCRAM_512_AUTH', + URI: { + Ref: 'SecretA720EF05', + }, + }, + ], + })); + + test.done(); + }, + }, + + 'self managed kafka': { + 'default'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const kafkaTopic = 'some-topic'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const bootstrapServers = ['kafka-broker:9092']; + + // WHEN + fn.addEventSource(new sources.SelfManagedKafkaEventSource( + { + bootstrapServers: bootstrapServers, + topic: kafkaTopic, + secret: secret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + })); + + // THEN + expect(stack).to(haveResource('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 'secretsmanager:GetSecretValue', + 'secretsmanager:DescribeSecret', + ], + Effect: 'Allow', + Resource: { + Ref: 'SecretA720EF05', + }, + }, + ], + Version: '2012-10-17', + }, + PolicyName: 'FnServiceRoleDefaultPolicyC6A839BF', + Roles: [ + { + Ref: 'FnServiceRoleB9001A96', + }, + ], + })); + + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + FunctionName: { + Ref: 'Fn9270CBC0', + }, + BatchSize: 100, + SelfManagedEventSource: { + Endpoints: { + KafkaBootstrapServers: bootstrapServers, + }, + }, + StartingPosition: 'TRIM_HORIZON', + Topics: [ + kafkaTopic, + ], + SourceAccessConfigurations: [ + { + Type: 'SASL_SCRAM_512_AUTH', + URI: { + Ref: 'SecretA720EF05', + }, + }, + ], + })); + + test.done(); + }, + + VPC: { + 'correctly rendered in the stack'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const kafkaTopic = 'some-topic'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const bootstrapServers = ['kafka-broker:9092']; + const sg = SecurityGroup.fromSecurityGroupId(stack, 'SecurityGroup', 'sg-0123456789'); + const vpc = new Vpc(stack, 'Vpc'); + + // WHEN + fn.addEventSource(new sources.SelfManagedKafkaEventSource( + { + bootstrapServers: bootstrapServers, + topic: kafkaTopic, + secret: secret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + vpc: vpc, + vpcSubnets: { subnetType: SubnetType.PRIVATE }, + securityGroup: sg, + })); + + // THEN + expect(stack).to(haveResource('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 'secretsmanager:GetSecretValue', + 'secretsmanager:DescribeSecret', + ], + Effect: 'Allow', + Resource: { + Ref: 'SecretA720EF05', + }, + }, + ], + Version: '2012-10-17', + }, + PolicyName: 'FnServiceRoleDefaultPolicyC6A839BF', + Roles: [ + { + Ref: 'FnServiceRoleB9001A96', + }, + ], + })); + + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + FunctionName: { + Ref: 'Fn9270CBC0', + }, + BatchSize: 100, + SelfManagedEventSource: { + Endpoints: { + KafkaBootstrapServers: bootstrapServers, + }, + }, + StartingPosition: 'TRIM_HORIZON', + Topics: [ + kafkaTopic, + ], + SourceAccessConfigurations: [ + { + Type: 'SASL_SCRAM_512_AUTH', + URI: { + Ref: 'SecretA720EF05', + }, + }, + { + Type: 'VPC_SECURITY_GROUP', + URI: 'sg-0123456789', + }, + { + Type: 'VPC_SUBNET', + URI: { + Ref: 'VpcPrivateSubnet1Subnet536B997A', + }, + }, + { + Type: 'VPC_SUBNET', + URI: { + Ref: 'VpcPrivateSubnet2Subnet3788AAA1', + }, + }, + ], + })); + + test.done(); + }, + 'setting vpc requires vpcSubnets to be set'(test: Test) { + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const kafkaTopic = 'some-topic'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const bootstrapServers = ['kafka-broker:9092']; + const vpc = new Vpc(stack, 'Vpc'); + + test.throws(() => { + fn.addEventSource(new sources.SelfManagedKafkaEventSource( + { + bootstrapServers: bootstrapServers, + topic: kafkaTopic, + secret: secret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + vpc: vpc, + securityGroup: SecurityGroup.fromSecurityGroupId(stack, 'SecurityGroup', 'sg-0123456789'), + + })); + }, /vpcSubnets must be set/); + + test.done(); + }, + + 'setting vpc requires securityGroup to be set'(test: Test) { + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const kafkaTopic = 'some-topic'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const bootstrapServers = ['kafka-broker:9092']; + const vpc = new Vpc(stack, 'Vpc'); + + test.throws(() => { + fn.addEventSource(new sources.SelfManagedKafkaEventSource( + { + bootstrapServers: bootstrapServers, + topic: kafkaTopic, + secret: secret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + vpc: vpc, + vpcSubnets: { subnetType: SubnetType.PRIVATE }, + })); + }, /securityGroup must be set/); + + test.done(); + }, + }, + + 'using SCRAM-SHA-256'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const kafkaTopic = 'some-topic'; + const secret = new Secret(stack, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' }); + const bootstrapServers = ['kafka-broker:9092']; + const sg = SecurityGroup.fromSecurityGroupId(stack, 'SecurityGroup', 'sg-0123456789'); + const vpc = new Vpc(stack, 'Vpc'); + + // WHEN + fn.addEventSource(new sources.SelfManagedKafkaEventSource( + { + bootstrapServers: bootstrapServers, + topic: kafkaTopic, + secret: secret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + vpc: vpc, + vpcSubnets: { subnetType: SubnetType.PRIVATE }, + securityGroup: sg, + authenticationMethod: sources.AuthenticationMethod.SASL_SCRAM_256_AUTH, + })); + + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + SourceAccessConfigurations: arrayWith( + { + Type: 'SASL_SCRAM_256_AUTH', + URI: { + Ref: 'SecretA720EF05', + }, + }, + ), + })); + + test.done(); + }, + }, + +} diff --git a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts index d44ce1cbea1b4..239bf58671b7e 100644 --- a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts +++ b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts @@ -4,12 +4,78 @@ import { IEventSourceDlq } from './dlq'; import { IFunction } from './function-base'; import { CfnEventSourceMapping } from './lambda.generated'; +/** + * The type of authentication protocol or the VPC components for your event source's SourceAccessConfiguration + * @see https://docs.aws.amazon.com/lambda/latest/dg/API_SourceAccessConfiguration.html#SSS-Type-SourceAccessConfiguration-Type + */ +export class SourceAccessConfigurationType { + + /** + * (MQ) The Secrets Manager secret that stores your broker credentials. + */ + public static readonly BASIC_AUTH = new SourceAccessConfigurationType('BASIC_AUTH'); + + /** + * The subnets associated with your VPC. Lambda connects to these subnets to fetch data from your Self-Managed Apache Kafka cluster. + */ + public static readonly VPC_SUBNET = new SourceAccessConfigurationType('VPC_SUBNET'); + + /** + * The VPC security group used to manage access to your Self-Managed Apache Kafka brokers. + */ + public static readonly VPC_SECURITY_GROUP = new SourceAccessConfigurationType('VPC_SECURITY_GROUP'); + + /** + * The Secrets Manager ARN of your secret key used for SASL SCRAM-256 authentication of your Self-Managed Apache Kafka brokers. + */ + public static readonly SASL_SCRAM_256_AUTH = new SourceAccessConfigurationType('SASL_SCRAM_256_AUTH'); + + /** + * The Secrets Manager ARN of your secret key used for SASL SCRAM-512 authentication of your Self-Managed Apache Kafka brokers. + */ + public static readonly SASL_SCRAM_512_AUTH = new SourceAccessConfigurationType('SASL_SCRAM_512_AUTH'); + + /** A custom source access configuration property */ + public static of(name: string): SourceAccessConfigurationType { + return new SourceAccessConfigurationType(name); + } + + /** + * The key to use in `SourceAccessConfigurationProperty.Type` property in CloudFormation + * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html#cfn-lambda-eventsourcemapping-sourceaccessconfiguration-type + */ + public readonly type: string; + + private constructor(type: string) { + this.type = type; + } +} + +/** + * Specific settings like the authentication protocol or the VPC components to secure access to your event source. + */ +export interface SourceAccessConfiguration { + /** + * The type of authentication protocol or the VPC components for your event source. For example: "SASL_SCRAM_512_AUTH". + */ + readonly type: SourceAccessConfigurationType, + /** + * The value for your chosen configuration in type. + * For example: "URI": "arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName". + * The exact string depends on the type. + * @see SourceAccessConfigurationType + */ + readonly uri: string +} + export interface EventSourceMappingOptions { /** * The Amazon Resource Name (ARN) of the event source. Any record added to * this stream can invoke the Lambda function. + * + * @default - not set if using a self managed Kafka cluster, throws an error otherwise */ - readonly eventSourceArn: string; + readonly eventSourceArn?: string; /** * The largest number of records that AWS Lambda will retrieve from your event @@ -101,6 +167,23 @@ export interface EventSourceMappingOptions { * @default - no topic */ readonly kafkaTopic?: string; + + /** + * A list of host and port pairs that are the addresses of the Kafka brokers in a self managed "bootstrap" Kafka cluster + * that a Kafka client connects to initially to bootstrap itself. + * They are in the format `abc.example.com:9096`. + * + * @default - none + */ + readonly kafkaBootstrapServers?: string[] + + /** + * Specific settings like the authentication protocol or the VPC components to secure access to your event source. + * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html + * + * @default - none + */ + readonly sourceAccessConfigurations?: SourceAccessConfiguration[] } /** @@ -154,6 +237,18 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp constructor(scope: Construct, id: string, props: EventSourceMappingProps) { super(scope, id); + if (props.eventSourceArn == undefined && props.kafkaBootstrapServers == undefined) { + throw new Error('Either eventSourceArn or kafkaBootstrapServers must be set'); + } + + if (props.eventSourceArn !== undefined && props.kafkaBootstrapServers !== undefined) { + throw new Error('eventSourceArn and kafkaBootstrapServers are mutually exclusive'); + } + + if (props.kafkaBootstrapServers && (props.kafkaBootstrapServers?.length < 1)) { + throw new Error('kafkaBootStrapServers must not be empty if set'); + } + if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) { throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`); } @@ -183,6 +278,11 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp }; } + let selfManagedEventSource; + if (props.kafkaBootstrapServers) { + selfManagedEventSource = { endpoints: { kafkaBootstrapServers: props.kafkaBootstrapServers } }; + } + const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', { batchSize: props.batchSize, bisectBatchOnFunctionError: props.bisectBatchOnError, @@ -196,6 +296,8 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp maximumRetryAttempts: props.retryAttempts, parallelizationFactor: props.parallelizationFactor, topics: props.kafkaTopic !== undefined ? [props.kafkaTopic] : undefined, + sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}), + selfManagedEventSource, }); this.eventSourceMappingId = cfnEventSourceMapping.ref; } diff --git a/packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts b/packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts index be42067f263f1..5d833a2d865c6 100644 --- a/packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts +++ b/packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts @@ -170,4 +170,95 @@ describe('event source mapping', () => { }], }); }); + + test('throws if neither eventSourceArn nor kafkaBootstrapServers are set', () => { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline('exports.handler = ${handler.toString()}'), + runtime: Runtime.NODEJS_10_X, + }); + + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + })).toThrow(/Either eventSourceArn or kafkaBootstrapServers must be set/); + }); + + test('throws if both eventSourceArn and kafkaBootstrapServers are set', () => { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline('exports.handler = ${handler.toString()}'), + runtime: Runtime.NODEJS_10_X, + }); + + expect(() => new EventSourceMapping(stack, 'test', { + eventSourceArn: '', + kafkaBootstrapServers: [], + target: fn, + })).toThrow(/eventSourceArn and kafkaBootstrapServers are mutually exclusive/); + }); + + test('throws if both kafkaBootstrapServers is set but empty', () => { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline('exports.handler = ${handler.toString()}'), + runtime: Runtime.NODEJS_10_X, + }); + + expect(() => new EventSourceMapping(stack, 'test', { + kafkaBootstrapServers: [], + target: fn, + })).toThrow(/kafkaBootStrapServers must not be empty if set/); + }); + + test('eventSourceArn appears in stack', () => { + const stack = new cdk.Stack(); + const topicNameParam = new cdk.CfnParameter(stack, 'TopicNameParam', { + type: 'String', + }); + + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline('exports.handler = ${handler.toString()}'), + runtime: Runtime.NODEJS_10_X, + }); + + let eventSourceArn = 'some-arn'; + + new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: eventSourceArn, + kafkaTopic: topicNameParam.valueAsString, + }); + + expect(stack).toHaveResourceLike('AWS::Lambda::EventSourceMapping', { + EventSourceArn: eventSourceArn, + }); + }); + + test('kafkaBootstrapServers appears in stack', () => { + const stack = new cdk.Stack(); + const topicNameParam = new cdk.CfnParameter(stack, 'TopicNameParam', { + type: 'String', + }); + + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline('exports.handler = ${handler.toString()}'), + runtime: Runtime.NODEJS_10_X, + }); + + let kafkaBootstrapServers = ['kafka-broker.example.com:9092']; + new EventSourceMapping(stack, 'test', { + target: fn, + kafkaBootstrapServers: kafkaBootstrapServers, + kafkaTopic: topicNameParam.valueAsString, + }); + + expect(stack).toHaveResourceLike('AWS::Lambda::EventSourceMapping', { + SelfManagedEventSource: { Endpoints: { KafkaBootstrapServers: kafkaBootstrapServers } }, + }); + }); }); diff --git a/packages/@aws-cdk/aws-msk/README.md b/packages/@aws-cdk/aws-msk/README.md index 1de05861fc74f..51d93453f1eef 100644 --- a/packages/@aws-cdk/aws-msk/README.md +++ b/packages/@aws-cdk/aws-msk/README.md @@ -9,6 +9,14 @@ > > [CFN Resources]: https://docs.aws.amazon.com/cdk/latest/guide/constructs.html#constructs_lib +![cdk-constructs: Experimental](https://img.shields.io/badge/cdk--constructs-experimental-important.svg?style=for-the-badge) + +> The APIs of higher level constructs in this module are experimental and under active development. +> They are subject to non-backward compatible changes or removal in any future version. These are +> not subject to the [Semantic Versioning](https://semver.org/) model and breaking changes will be +> announced in the release notes. This means that while you may use them, you may need to update +> your source code when upgrading to a newer version of this package. + --- diff --git a/packages/@aws-cdk/aws-msk/lib/cluster.ts b/packages/@aws-cdk/aws-msk/lib/cluster.ts new file mode 100644 index 0000000000000..313bd3de5b107 --- /dev/null +++ b/packages/@aws-cdk/aws-msk/lib/cluster.ts @@ -0,0 +1,34 @@ +import { IResource, Resource } from '@aws-cdk/core'; +import { Construct } from 'constructs'; + +/** + * Represents an MSK cluster + */ +export interface ICluster extends IResource { + /** + * the ARN of the MSK cluster + */ + readonly clusterArn: string; +} + +/** + * An MSK cluster + */ +export class Cluster { + /** + * Creates a Cluster construct that represents an existing MSK cluster. + * @param scope + * @param id + * @param clusterArn + */ + public static fromClusterArn(scope: Construct, id: string, clusterArn: string): ICluster { + class Imported extends Resource implements ICluster { + public readonly clusterArn: string; + constructor() { + super(scope, id); + this.clusterArn = clusterArn; + } + } + return new Imported(); + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-msk/lib/index.ts b/packages/@aws-cdk/aws-msk/lib/index.ts index 3cf150cbd7076..9cc4acdc7d61f 100644 --- a/packages/@aws-cdk/aws-msk/lib/index.ts +++ b/packages/@aws-cdk/aws-msk/lib/index.ts @@ -1,2 +1,3 @@ +export * from './cluster'; // AWS::MSK CloudFormation Resources: export * from './msk.generated'; diff --git a/packages/@aws-cdk/aws-msk/package.json b/packages/@aws-cdk/aws-msk/package.json index 939b66619cda7..efa91da07d39f 100644 --- a/packages/@aws-cdk/aws-msk/package.json +++ b/packages/@aws-cdk/aws-msk/package.json @@ -90,7 +90,7 @@ "node": ">= 10.13.0 <13 || >=13.7.0" }, "stability": "experimental", - "maturity": "cfn-only", + "maturity": "experimental", "awscdkio": { "announce": false },