Skip to content

Commit

Permalink
Flatten to kafkaBootstrapServers
Browse files Browse the repository at this point in the history
- Add tests
- Better docs
  • Loading branch information
bracki committed Feb 11, 2021
1 parent 14314e5 commit f73eaef
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 26 deletions.
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource<SelfManagedKa
target.addEventSourceMapping(
`KafkaEventSource:${idHash}:${this.props.topic}`,
this.enrichMappingOptions({
selfManagedEventSource: { endpoints: { kafkaBootstrapServers: this.props.bootstrapServers } },
kafkaBootstrapServers: this.props.bootstrapServers,
kafkaTopic: this.props.topic,
startingPosition: this.props.startingPosition,
sourceAccessConfigurations,
Expand Down
53 changes: 28 additions & 25 deletions packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,6 @@ export interface SourceAccessConfiguration {
readonly uri: string
}

/**
* The endpoints for your self managed event source
*/
export interface SelfManagedEventSourceEndpoints {
/**
* A list of Kafka bootstrap servers in the format HOST:PORT, e.g. 'kafka-broker01:9096'
*/
readonly kafkaBootstrapServers: string[]
}

/**
* The configuration for your self managed event source, currently only Kafka is supported
*/
export interface SelfManagedEventSource {
/**
* The endpoints for your self managed event source
*/
readonly endpoints: SelfManagedEventSourceEndpoints
}

export interface EventSourceMappingOptions {
/**
* The Amazon Resource Name (ARN) of the event source. Any record added to
Expand Down Expand Up @@ -140,18 +120,20 @@ export interface EventSourceMappingOptions {
readonly kafkaTopic?: string;

/**
* Specific settings like the authentication protocol or the VPC components to secure access to your event source.
* A list of host and port pairs that are the addresses of the Kafka brokers in a self managed "bootstrap" Kafka cluster
* that a Kafka client connects to initially to bootstrap itself.
* They are in the format `abc.example.com:9096`.
*
* @default - none
*/
readonly sourceAccessConfigurations?: SourceAccessConfiguration[]
readonly kafkaBootstrapServers?: string[]

/**
* The configuration for your self managed event source
* Specific settings like the authentication protocol or the VPC components to secure access to your event source.
*
* @default - none
*/
readonly selfManagedEventSource?: SelfManagedEventSource
readonly sourceAccessConfigurations?: SourceAccessConfiguration[]
}

/**
Expand Down Expand Up @@ -205,6 +187,18 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
constructor(scope: Construct, id: string, props: EventSourceMappingProps) {
super(scope, id);

if (props.eventSourceArn == undefined && props.kafkaBootstrapServers == undefined) {
throw new Error('Either eventSourceArn or kafkaBootstrapServers must be set');
}

if (props.eventSourceArn !== undefined && props.kafkaBootstrapServers !== undefined) {
throw new Error('eventSourceArn and kafkaBootstrapServers are mutually exclusive');
}

if (props.kafkaBootstrapServers && (props.kafkaBootstrapServers?.length < 1)) {
throw new Error('kafkaBootStrapServers must not be empty if set');
}

if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) {
throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`);
}
Expand Down Expand Up @@ -234,6 +228,15 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
};
}

let selfManagedEventSource;
if (props.kafkaBootstrapServers) {
selfManagedEventSource = {
endpoints: {
kafkaBootstrapServers: props.kafkaBootstrapServers,
},
};
}

const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
bisectBatchOnFunctionError: props.bisectBatchOnError,
Expand All @@ -248,7 +251,7 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
parallelizationFactor: props.parallelizationFactor,
topics: props.kafkaTopic !== undefined ? [props.kafkaTopic] : undefined,
sourceAccessConfigurations: props.sourceAccessConfigurations,
selfManagedEventSource: props.selfManagedEventSource,
selfManagedEventSource,
});
this.eventSourceMappingId = cfnEventSourceMapping.ref;
}
Expand Down
42 changes: 42 additions & 0 deletions packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,46 @@ describe('event source mapping', () => {
}],
});
});

test('throws if neither eventSourceArn nor kafkaBootstrapServers are set', () => {
const stack = new cdk.Stack();
const fn = new Function(stack, 'fn', {
handler: 'index.handler',
code: Code.fromInline('exports.handler = ${handler.toString()}'),
runtime: Runtime.NODEJS_10_X,
});

expect(() => new EventSourceMapping(stack, 'test', {
target: fn,
})).toThrow(/Either eventSourceArn or kafkaBootstrapServers must be set/);
});

test('throws if both eventSourceArn and kafkaBootstrapServers are set', () => {
const stack = new cdk.Stack();
const fn = new Function(stack, 'fn', {
handler: 'index.handler',
code: Code.fromInline('exports.handler = ${handler.toString()}'),
runtime: Runtime.NODEJS_10_X,
});

expect(() => new EventSourceMapping(stack, 'test', {
eventSourceArn: '',
kafkaBootstrapServers: [],
target: fn,
})).toThrow(/eventSourceArn and kafkaBootstrapServers are mutually exclusive/);
});

test('throws if both kafkaBootstrapServers is set but empty', () => {
const stack = new cdk.Stack();
const fn = new Function(stack, 'fn', {
handler: 'index.handler',
code: Code.fromInline('exports.handler = ${handler.toString()}'),
runtime: Runtime.NODEJS_10_X,
});

expect(() => new EventSourceMapping(stack, 'test', {
kafkaBootstrapServers: [],
target: fn,
})).toThrow(/kafkaBootStrapServers must not be empty if set/);
});
});

0 comments on commit f73eaef

Please sign in to comment.