From 3c654fb7fbdfef93455f662f8e49780437097e14 Mon Sep 17 00:00:00 2001 From: Tatsuya Yamamoto Date: Tue, 1 Feb 2022 02:49:35 +0900 Subject: [PATCH] feat(iot): add Action to republish MQTT messages to another MQTT topic (#18661) resolve https://github.com/aws/aws-cdk/issues/17701 ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- packages/@aws-cdk/aws-iot-actions/README.md | 17 +++ .../@aws-cdk/aws-iot-actions/lib/index.ts | 1 + .../lib/iot-republish-action.ts | 72 ++++++++++++ .../integ.iot-republish-action.expected.json | 65 +++++++++++ .../test/iot/integ.iot-republish-action.ts | 25 ++++ .../test/iot/iot-republish-action.test.ts | 107 ++++++++++++++++++ 6 files changed, 287 insertions(+) create mode 100644 packages/@aws-cdk/aws-iot-actions/lib/iot-republish-action.ts create mode 100644 packages/@aws-cdk/aws-iot-actions/test/iot/integ.iot-republish-action.expected.json create mode 100644 packages/@aws-cdk/aws-iot-actions/test/iot/integ.iot-republish-action.ts create mode 100644 packages/@aws-cdk/aws-iot-actions/test/iot/iot-republish-action.test.ts diff --git a/packages/@aws-cdk/aws-iot-actions/README.md b/packages/@aws-cdk/aws-iot-actions/README.md index be475fe028210..860643683efd1 100644 --- a/packages/@aws-cdk/aws-iot-actions/README.md +++ b/packages/@aws-cdk/aws-iot-actions/README.md @@ -21,6 +21,7 @@ supported AWS Services. Instances of these classes should be passed to Currently supported are: +- Republish a message to another MQTT topic - Invoke a Lambda function - Put objects to a S3 bucket - Put logs to CloudWatch Logs @@ -30,6 +31,22 @@ Currently supported are: - Put records to Kinesis Data Firehose stream - Send messages to SQS queues +## Republish a message to another MQTT topic + +The code snippet below creates an AWS IoT Rule that republish a message to +another MQTT topic when it is triggered. + +```ts +new iot.TopicRule(this, 'TopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id, timestamp() as timestamp, temperature FROM 'device/+/data'"), + actions: [ + new actions.IotRepublishMqttAction('${topic()}/republish', { + qualityOfService: actions.MqttQualityOfService.AT_LEAST_ONCE, // optional property, default is MqttQualityOfService.ZERO_OR_MORE_TIMES + }), + ], +}); +``` + ## Invoke a Lambda function The code snippet below creates an AWS IoT Rule that invoke a Lambda function diff --git a/packages/@aws-cdk/aws-iot-actions/lib/index.ts b/packages/@aws-cdk/aws-iot-actions/lib/index.ts index a0ee864a5bce2..c3a7bb547b1c8 100644 --- a/packages/@aws-cdk/aws-iot-actions/lib/index.ts +++ b/packages/@aws-cdk/aws-iot-actions/lib/index.ts @@ -3,6 +3,7 @@ export * from './cloudwatch-put-metric-action'; export * from './cloudwatch-set-alarm-state-action'; export * from './common-action-props'; export * from './firehose-put-record-action'; +export * from './iot-republish-action'; export * from './kinesis-put-record-action'; export * from './lambda-function-action'; export * from './s3-put-object-action'; diff --git a/packages/@aws-cdk/aws-iot-actions/lib/iot-republish-action.ts b/packages/@aws-cdk/aws-iot-actions/lib/iot-republish-action.ts new file mode 100644 index 0000000000000..77aadb876c4d9 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/lib/iot-republish-action.ts @@ -0,0 +1,72 @@ +import * as iam from '@aws-cdk/aws-iam'; +import * as iot from '@aws-cdk/aws-iot'; +import { CommonActionProps } from './common-action-props'; +import { singletonActionRole } from './private/role'; + +/** + * MQTT Quality of Service (QoS) indicates the level of assurance for delivery of an MQTT Message. + * + * @see https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt-qos + */ +export enum MqttQualityOfService { + /** + * QoS level 0. Sent zero or more times. + * This level should be used for messages that are sent over reliable communication links or that can be missed without a problem. + */ + ZERO_OR_MORE_TIMES, + + /** + * QoS level 1. Sent at least one time, and then repeatedly until a PUBACK response is received. + * The message is not considered complete until the sender receives a PUBACK response to indicate successful delivery. + */ + AT_LEAST_ONCE, +} + +/** + * Configuration properties of an action to republish MQTT messages. + */ +export interface IotRepublishMqttActionProps extends CommonActionProps { + /** + * The Quality of Service (QoS) level to use when republishing messages. + * + * @see https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt-qos + * + * @default MqttQualityOfService.ZERO_OR_MORE_TIMES + */ + readonly qualityOfService?: MqttQualityOfService; +} + +/** + * The action to put the record from an MQTT message to republish another MQTT topic. + */ +export class IotRepublishMqttAction implements iot.IAction { + private readonly qualityOfService?: MqttQualityOfService; + private readonly role?: iam.IRole; + + /** + * @param topic The MQTT topic to which to republish the message. + * @param props Optional properties to not use default. + */ + constructor(private readonly topic: string, props: IotRepublishMqttActionProps = {}) { + this.qualityOfService = props.qualityOfService; + this.role = props.role; + } + + bind(rule: iot.ITopicRule): iot.ActionConfig { + const role = this.role ?? singletonActionRole(rule); + role.addToPrincipalPolicy(new iam.PolicyStatement({ + actions: ['iot:Publish'], + resources: ['*'], + })); + + return { + configuration: { + republish: { + topic: this.topic, + qos: this.qualityOfService, + roleArn: role.roleArn, + }, + }, + }; + } +} diff --git a/packages/@aws-cdk/aws-iot-actions/test/iot/integ.iot-republish-action.expected.json b/packages/@aws-cdk/aws-iot-actions/test/iot/integ.iot-republish-action.expected.json new file mode 100644 index 0000000000000..c396017676ac4 --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/iot/integ.iot-republish-action.expected.json @@ -0,0 +1,65 @@ +{ + "Resources": { + "TopicRule40A4EA44": { + "Type": "AWS::IoT::TopicRule", + "Properties": { + "TopicRulePayload": { + "Actions": [ + { + "Republish": { + "Qos": 1, + "RoleArn": { + "Fn::GetAtt": [ + "TopicRuleTopicRuleActionRole246C4F77", + "Arn" + ] + }, + "Topic": "${topic()}/republish" + } + } + ], + "AwsIotSqlVersion": "2016-03-23", + "Sql": "SELECT * FROM 'device/+/data'" + } + } + }, + "TopicRuleTopicRuleActionRole246C4F77": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "iot.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + } + } + }, + "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": "iot:Publish", + "Effect": "Allow", + "Resource": "*" + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687", + "Roles": [ + { + "Ref": "TopicRuleTopicRuleActionRole246C4F77" + } + ] + } + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-iot-actions/test/iot/integ.iot-republish-action.ts b/packages/@aws-cdk/aws-iot-actions/test/iot/integ.iot-republish-action.ts new file mode 100644 index 0000000000000..d5a71b32f6a0d --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/iot/integ.iot-republish-action.ts @@ -0,0 +1,25 @@ +import * as iot from '@aws-cdk/aws-iot'; +import * as cdk from '@aws-cdk/core'; +import * as actions from '../../lib'; + +class TestStack extends cdk.Stack { + constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { + super(scope, id, props); + + const topicRule = new iot.TopicRule(this, 'TopicRule', { + sql: iot.IotSql.fromStringAsVer20160323( + "SELECT * FROM 'device/+/data'", + ), + }); + + topicRule.addAction( + new actions.IotRepublishMqttAction('${topic()}/republish', { + qualityOfService: actions.MqttQualityOfService.AT_LEAST_ONCE, + }), + ); + } +} + +const app = new cdk.App(); +new TestStack(app, 'iot-republish-action-test-stack'); +app.synth(); diff --git a/packages/@aws-cdk/aws-iot-actions/test/iot/iot-republish-action.test.ts b/packages/@aws-cdk/aws-iot-actions/test/iot/iot-republish-action.test.ts new file mode 100644 index 0000000000000..8a78e272285ef --- /dev/null +++ b/packages/@aws-cdk/aws-iot-actions/test/iot/iot-republish-action.test.ts @@ -0,0 +1,107 @@ +import { Template, Match } from '@aws-cdk/assertions'; +import * as iam from '@aws-cdk/aws-iam'; +import * as iot from '@aws-cdk/aws-iot'; +import * as cdk from '@aws-cdk/core'; +import * as actions from '../../lib'; + +let stack: cdk.Stack; +let topicRule:iot.TopicRule; +beforeEach(() => { + stack = new cdk.Stack(); + topicRule = new iot.TopicRule(stack, 'MyTopicRule', { + sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), + }); +}); + +test('Default IoT republish action', () => { + // WHEN + topicRule.addAction( + new actions.IotRepublishMqttAction('test-topic'), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + { + Republish: { + Topic: 'test-topic', + RoleArn: { + 'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'], + }, + }, + }, + ], + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', { + AssumeRolePolicyDocument: { + Statement: [ + { + Action: 'sts:AssumeRole', + Effect: 'Allow', + Principal: { + Service: 'iot.amazonaws.com', + }, + }, + ], + Version: '2012-10-17', + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: 'iot:Publish', + Effect: 'Allow', + Resource: '*', + }, + ], + Version: '2012-10-17', + }, + PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7', + Roles: [ + { Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' }, + ], + }); +}); + +test('can set qualityOfService', () => { + // WHEN + topicRule.addAction( + new actions.IotRepublishMqttAction('test-topic', { qualityOfService: actions.MqttQualityOfService.AT_LEAST_ONCE }), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + Match.objectLike({ Republish: { Qos: 1 } }), + ], + }, + }); +}); + +test('can set role', () => { + // WHEN + const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest'); + topicRule.addAction( + new actions.IotRepublishMqttAction('test-topic', { role }), + ); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { + TopicRulePayload: { + Actions: [ + Match.objectLike({ Republish: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }), + ], + }, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyName: 'MyRolePolicy64AB00A5', + Roles: ['ForTest'], + }); +});