diff --git a/.github/workflows/build-timestamped-master.yml b/.github/workflows/build-timestamped-master.yml new file mode 100644 index 0000000..f5023db --- /dev/null +++ b/.github/workflows/build-timestamped-master.yml @@ -0,0 +1,47 @@ +name: Build + +on: + push: + branches: + - main + paths-ignore: + - '*.md' + workflow_dispatch: + +jobs: + build: + runs-on: ubuntu-latest + if: github.repository_owner == 'xlibb' + steps: + - name: Checkout Repository + uses: actions/checkout@v2 + - name: Set up JDK 11 + uses: actions/setup-java@v2 + with: + distribution: 'temurin' + java-version: 11 + - name: Change to Timestamped Version + run: | + startTime=$(TZ="Asia/Kolkata" date +'%Y%m%d-%H%M00') + latestCommit=$(git log -n 1 --pretty=format:"%h") + VERSION=$((grep -w 'version' | cut -d= -f2) < gradle.properties | rev | cut --complement -d- -f1 | rev) + updatedVersion=$VERSION-$startTime-$latestCommit + echo $updatedVersion + sed -i "s/version=\(.*\)/version=$updatedVersion/g" gradle.properties + - name: Grant execute permission for gradlew + run: chmod +x gradlew + - name: Build with Gradle + env: + packageUser: ${{ secrets.BALLERINA_BOT_USERNAME }} + packagePAT: ${{ secrets.BALLERINA_BOT_TOKEN }} + publishUser: ${{ secrets.BALLERINA_BOT_USERNAME }} + publishPAT: ${{ secrets.BALLERINA_BOT_TOKEN }} + run: | + ./gradlew publish --scan --no-daemon + - name: Generate Codecov Report + uses: codecov/codecov-action@v3 + - name: Upload Artifact + uses: actions/upload-artifact@v2 + with: + name: ballerina-runtime + path: target/ballerina-runtime/ diff --git a/.github/workflows/build-with-bal-test-graalvm.yml b/.github/workflows/build-with-bal-test-graalvm.yml new file mode 100644 index 0000000..4144b1f --- /dev/null +++ b/.github/workflows/build-with-bal-test-graalvm.yml @@ -0,0 +1,39 @@ +name: GraalVM Check + +on: + workflow_dispatch: + inputs: + lang_tag: + description: Branch/Release Tag of the Ballerina Lang + required: true + default: master + lang_version: + description: Ballerina Lang Version (If given ballerina lang buid will be skipped) + required: false + default: '' + native_image_options: + description: Default native-image options + required: false + default: '' + schedule: + - cron: '30 18 * * *' + pull_request: + branches: + - master + types: [opened, synchronize, reopened, labeled, unlabeled] + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.run_id }} + cancel-in-progress: true + +jobs: + call_stdlib_workflow: + name: Run StdLib Workflow + if: ${{ github.event_name != 'schedule' || (github.event_name == 'schedule' && github.repository_owner == 'ballerina-platform') }} + uses: ballerina-platform/ballerina-standard-library/.github/workflows/build-with-bal-test-graalvm-template.yml@main + with: + lang_tag: ${{ inputs.lang_tag }} + lang_version: ${{ inputs.lang_version }} + native_image_options: ${{ inputs.native_image_options }} + additional_ubuntu_build_flags: '' + additional_windows_build_flags: '-x test' diff --git a/.github/workflows/daily-build.yml b/.github/workflows/daily-build.yml new file mode 100644 index 0000000..c06b24c --- /dev/null +++ b/.github/workflows/daily-build.yml @@ -0,0 +1,33 @@ +name: Daily build + +on: + repository_dispatch: + types: + check_connector_for_breaking_changes + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - name: Set up JDK 11 + uses: actions/setup-java@v2 + with: + distribution: 'temurin' + java-version: 11 + + - name: Set environment variable + if: github.event.action == 'check_connector_for_breaking_changes' + run: | + echo "BUILD_USING_DOCKER=-PbuildUsingDocker=nightly" >> $GITHUB_ENV + echo "GRADLE_SKIP_TASKS=" >> $GITHUB_ENV + + # Build the project with Gradle + - name: Build with Gradle + env: + packageUser: ${{ github.actor }} + packagePAT: ${{ secrets.GITHUB_TOKEN }} + run: | + ./gradlew clean build $GRADLE_SKIP_TASKS $BUILD_USING_DOCKER diff --git a/.github/workflows/stale_check.yml b/.github/workflows/stale_check.yml new file mode 100644 index 0000000..8763360 --- /dev/null +++ b/.github/workflows/stale_check.yml @@ -0,0 +1,19 @@ +name: 'Close stale pull requests' + +on: + schedule: + - cron: '30 19 * * *' + workflow_dispatch: + +jobs: + stale: + runs-on: ubuntu-latest + steps: + - uses: actions/stale@v3 + with: + stale-pr-message: 'This PR has been open for more than 15 days with no activity. This will be closed in 3 days unless the `stale` label is removed or commented.' + close-pr-message: 'Closed PR due to inactivity for more than 18 days.' + days-before-pr-stale: 15 + days-before-pr-close: 3 + days-before-issue-stale: -1 + days-before-issue-close: -1 diff --git a/README.md b/README.md index d1a16a9..3fc7c1c 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,12 @@ # Ballerina MQTT Library -This package provides an implementation to interact with MQTT servers via MQTT client and listener. +[![Build](https://github.com/xlibb/module-mqtt/actions/workflows/build-timestamped-master.yml/badge.svg)](https://github.com/xlibb/module-mqtt/actions/workflows/build-timestamped-master.yml) +[![codecov](https://codecov.io/gh/xlibb/module-mqtt/branch/master/graph/badge.svg)](https://codecov.io/gh/xlibb/module-mqtt) +[![Trivy](https://github.com/xlibb/module-mqtt/actions/workflows/trivy-scan.yml/badge.svg)](https://github.com/xlibb/module-mqtt/actions/workflows/trivy-scan.yml) +[![GraalVM Check](https://github.com/xlibb/module-mqtt/actions/workflows/build-with-bal-test-graalvm.yml/badge.svg)](https://github.com/xlibb/module-mqtt/actions/workflows/build-with-bal-test-graalvm.yml) +[![GitHub Last Commit](https://img.shields.io/github/last-commit/xlibb/module-mqtt.svg)](https://github.com/xlibb/module-mqtt/commits/master) + +This Library provides an implementation to interact with MQTT servers via MQTT client and listener. MQTT is a lightweight, publish-subscribe, machine to machine network protocol for message queue/message queuing service. diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index eadc294..dfa4666 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "xlibb" name = "mqtt" -version = "0.1.4" +version = "0.2.0" authors = ["xlibb"] keywords = ["mqtt"] repository = "https://github.com/xlibb/module-mqtt" @@ -10,12 +10,18 @@ distribution = "2201.6.0" [[platform.java11.dependency]] groupId = "io.xlibb" artifactId = "mqtt-native" -version = "0.1.4" -path = "../native/build/libs/mqtt-native-0.1.4.jar" +version = "0.2.0" +path = "../native/build/libs/mqtt-native-0.2.0-SNAPSHOT.jar" # Azure dependencies [[platform.java11.dependency]] groupId = "org.eclipse.paho" -artifactId = "org.eclipse.paho.client.mqttv3" +artifactId = "org.eclipse.paho.mqttv5.client" version = "1.2.5" -path = "./lib/org.eclipse.paho.client.mqttv3-1.2.5.jar" +path = "./lib/org.eclipse.paho.mqttv5.client-1.2.5.jar" + +[[platform.java11.dependency]] +groupId = "org.bouncycastle" +artifactId = "bcpkix-jdk15on" +version = "1.69" +path = "./lib/bcpkix-jdk15on-1.69.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 38a930c..35aa720 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -11,11 +11,13 @@ distribution-version = "2201.6.0" org = "ballerina" name = "crypto" version = "2.3.1" -scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"} ] +modules = [ + {org = "ballerina", packageName = "crypto", moduleName = "crypto"} +] [[package]] org = "ballerina" @@ -133,7 +135,6 @@ modules = [ org = "ballerina" name = "time" version = "2.2.5" -scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -156,8 +157,9 @@ modules = [ [[package]] org = "xlibb" name = "mqtt" -version = "0.1.4" +version = "0.2.0" dependencies = [ + {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "lang.runtime"}, {org = "ballerina", name = "log"}, diff --git a/ballerina/build.gradle b/ballerina/build.gradle index c94f020..af323bb 100644 --- a/ballerina/build.gradle +++ b/ballerina/build.gradle @@ -52,17 +52,20 @@ configurations { dependencies { /* Azure dependencies */ - externalJars(group: 'org.eclipse.paho', name: 'org.eclipse.paho.client.mqttv3', version: "${pahoMqttVersion}") { + externalJars(group: 'org.eclipse.paho', name: 'org.eclipse.paho.mqttv5.client', version: "${pahoMqtt5Version}") { + transitive = false + } + externalJars(group: 'org.bouncycastle', name: 'bcpkix-jdk15on', version: "${bouncycastleVersion}") { transitive = false } } task updateTomlFiles { doLast { - def stdlibDependentPahoMqttVersion = project.pahoMqttVersion def newConfig = ballerinaTomlFilePlaceHolder.text.replace("@project.version@", project.version) newConfig = newConfig.replace("@toml.version@", tomlVersion) - newConfig = newConfig.replace("@paho.mqtt.version@", stdlibDependentPahoMqttVersion) + newConfig = newConfig.replace("@paho.mqtt.version@", project.pahoMqtt5Version) + newConfig = newConfig.replace("@bouncy.castle.version@", project.bouncycastleVersion) ballerinaTomlFile.text = newConfig } } diff --git a/ballerina/errors.bal b/ballerina/errors.bal index caf4234..86203fc 100644 --- a/ballerina/errors.bal +++ b/ballerina/errors.bal @@ -1,2 +1,9 @@ # The common error type for the module. -public type Error distinct error; +public type Error distinct error; + +# The error details type for the module. +# +# + reasonCode - The reason code for the error +public type ErrorDetails record {| + int reasonCode?; +|}; diff --git a/ballerina/listener.bal b/ballerina/listener.bal index ba3c525..42691e3 100644 --- a/ballerina/listener.bal +++ b/ballerina/listener.bal @@ -3,19 +3,35 @@ import ballerina/jballerina.java; # Represents a MQTT listener endpoint. public isolated client class Listener { - private final string[] & readonly topics; + private final Subscription[] & readonly mqttSubscriptions; # Creates a new `mqtt:Listener`. # # + serverUri - The URI of the remote MQTT server # + clientId - The unique client ID to identify the listener - # + topics - The topics to be subscribed to + # + subscriptions - The topics to be subscribed to # + return - `mqtt:Error` if an error occurs while creating the listener - public isolated function init(string serverUri, string clientId, string|string[] topics, *ListenerConfiguration config) returns Error? { - if topics is string { - self.topics = [topics]; + public isolated function init(string serverUri, string clientId, string|string[]|Subscription|Subscription[] subscriptions, *ListenerConfiguration config) returns Error? { + if subscriptions is Subscription { + self.mqttSubscriptions = [subscriptions.cloneReadOnly()]; + } else if subscriptions is string { + self.mqttSubscriptions = [{ + topic: subscriptions, + qos: 1 + }]; + } else if subscriptions is string[] { + Subscription[] mqttSubscriptions = []; + foreach string topic in subscriptions { + mqttSubscriptions.push({ + topic: topic, + qos: 1 + }); + } + self.mqttSubscriptions = mqttSubscriptions.cloneReadOnly(); + } else if subscriptions is Subscription[] { + self.mqttSubscriptions = subscriptions.cloneReadOnly(); } else { - self.topics = topics.cloneReadOnly(); + panic error("Invalid topics provided"); } check self.externInit(serverUri, clientId, config); } @@ -32,10 +48,10 @@ public isolated client class Listener { # # + return - A `error` if an error is encountered while starting the server or else `()` public isolated function 'start() returns error? { - check self.externStart(self.topics); + check self.externStart(self.mqttSubscriptions); }; - private isolated function externStart(string[] topics) returns error? = + private isolated function externStart(Subscription[] topics) returns error? = @java:Method { 'class: "io.xlibb.mqtt.listener.ListenerActions" } external; diff --git a/ballerina/tests/publish_subscribe_tests.bal b/ballerina/tests/publish_subscribe_tests.bal index fc51858..43628cb 100644 --- a/ballerina/tests/publish_subscribe_tests.bal +++ b/ballerina/tests/publish_subscribe_tests.bal @@ -3,21 +3,12 @@ import ballerina/lang.runtime; import ballerina/test; import ballerina/uuid; -string receivedMessage = ""; - -service on new Listener("ssl://localhost:1883", uuid:createType1AsString(), "mqtt/test", { - connectionConfig: { - username: "ballerina", - password: "ballerinamqtt", - connectionTimeout: 100, - secureSocket: { - cert: "tests/resources/certsandkeys/ca.crt" - } - } -}) { +final string[] receivedMessages = []; + +final Service basicService = service object { remote function onMessage(Message message) returns error? { log:printInfo(check string:fromBytes(message.payload)); - receivedMessage = check string:fromBytes(message.payload); + receivedMessages.push(check string:fromBytes(message.payload)); } remote function onError(Error err) returns error? { @@ -28,21 +19,419 @@ service on new Listener("ssl://localhost:1883", uuid:createType1AsString(), "mqt log:printInfo("Message delivered " + token.messageId.toString()); log:printInfo(check string:fromBytes(token.message.payload)); } -} +}; @test:Config {enable: true} function basicPublishSubscribeTest() returns error? { - Client 'client = check new ("ssl://localhost:1883", uuid:createType1AsString(), { + Listener 'listener = check new (NO_AUTH_ENDPOINT, uuid:createType1AsString(), "mqtt/basictest"); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (NO_AUTH_ENDPOINT, uuid:createType1AsString()); + string message = "Test message for basic pub sub test"; + check 'client->publish("mqtt/basictest", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + lock { + test:assertTrue(receivedMessages.indexOf(message) != ()); + } +} + +@test:Config {enable: true} +function basicPublishSubscribeWithAuthTest() returns error? { + Listener 'listener = check new (AUTH_ONLY_ENDPOINT, uuid:createType1AsString(), "mqtt/basictest", {connectionConfig: authConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (AUTH_ONLY_ENDPOINT, uuid:createType1AsString(), {connectionConfig: authConnConfig}); + string message = "Test message for basic pub sub with auth test"; + check 'client->publish("mqtt/basictest", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message) != ()); +} + +@test:Config {enable: true} +function basicPublishSubscribeWithTLSTest() returns error? { + Listener 'listener = check new (NO_AUTH_ENCRYPTED_ENDPOINT, uuid:createType1AsString(), "mqtt/basictest", {connectionConfig: tlsConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (NO_AUTH_ENCRYPTED_ENDPOINT, uuid:createType1AsString(), {connectionConfig: tlsConnConfig}); + string message = "Test message for basic pub sub with tls test"; + check 'client->publish("mqtt/basictest", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message) != ()); +} + +@test:Config {enable: true} +function basicPublishSubscribeWithMTLSTest() returns error? { + Listener 'listener = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/basictest", {connectionConfig: mtlsConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: mtlsConnConfig}); + string message = "Test message for basic pub sub with mtls test"; + check 'client->publish("mqtt/basictest", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message) != ()); +} + +@test:Config {enable: true} +function basicPublishSubscribeWithAuthAndMTLSTest() returns error? { + Listener 'listener = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/basictest", {connectionConfig: authMtlsConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: authMtlsConnConfig}); + string message = "Test message for basic pub sub with auth and mtls test"; + check 'client->publish("mqtt/basictest", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message) != ()); +} + +@test:Config {enable: true} +function subscribeToMultipleTopicsTest() returns error? { + Listener 'listener = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), ["mqtt/topic1", "mqtt/topic2"], {connectionConfig: authMtlsConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: authMtlsConnConfig}); + string message1 = "Test message for topic 1"; + string message2 = "Test message for topic 1"; + check 'client->publish("mqtt/topic1", {payload: message1.toBytes()}); + check 'client->publish("mqtt/topic2", {payload: message1.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message1) != ()); + test:assertTrue(receivedMessages.indexOf(message2) != ()); +} + +@test:Config {enable: true} +function subscribeToSubscriptionTest() returns error? { + Listener 'listener = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {topic: "mqtt/subscriptiontopic", qos: 2}, {connectionConfig: authMtlsConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: authMtlsConnConfig}); + string message = "Test message for subscription"; + check 'client->publish("mqtt/subscriptiontopic", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message) != ()); +} + +@test:Config {enable: true} +function subscribeToMultipleSubscriptionsTest() returns error? { + Listener 'listener = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), [{topic: "mqtt/subscriptiontopic1", qos: 2}, {topic: "mqtt/subscriptiontopic2", qos: 0}], {connectionConfig: authMtlsConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: authMtlsConnConfig}); + string message1 = "Test message for subscription1"; + string message2 = "Test message for subscription2"; + check 'client->publish("mqtt/subscriptiontopic1", {payload: message1.toBytes()}); + check 'client->publish("mqtt/subscriptiontopic2", {payload: message2.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message1) != ()); + test:assertTrue(receivedMessages.indexOf(message2) != ()); +} + +@test:Config {enable: true} +function publishSubscribeWithMTLSTrustKeyStoresTest() returns error? { + Listener 'listener = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/trustkeystorestopic", {connectionConfig: mtlsWithTrustKeyStoreConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: mtlsWithTrustKeyStoreConnConfig}); + string message = "Test message for mtls with trust and key stores"; + check 'client->publish("mqtt/trustkeystorestopic", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message) != ()); +} + +@test:Config {enable: true} +function subscribeWithManualAcks() returns error? { + Listener 'listener = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/manualackstopic", {connectionConfig: mtlsConnConfig, manualAcks: true}); + Service manualAcksService = service object { + remote function onMessage(Message message, Caller caller) returns error? { + log:printInfo(check string:fromBytes(message.payload)); + receivedMessages.push(check string:fromBytes(message.payload)); + check caller->complete(); + } + remote function onError(Error err) returns error? { + log:printError("Error occured ", err); + } + remote function onCompleted(DeliveryToken token) returns error? { + log:printInfo("Message delivered " + token.messageId.toString()); + log:printInfo(check string:fromBytes(token.message.payload)); + } + }; + check 'listener.attach(manualAcksService); + check 'listener.'start(); + + Client 'client = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: mtlsConnConfig}); + string message = "Test message for manual acks"; + check 'client->publish("mqtt/manualackstopic", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message) != ()); +} + +@test:Config {enable: true} +function closeWithoutDisconnectTest() returns error? { + Client 'client = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: mtlsConnConfig}); + string message = "Test message for closing without disconnect"; + check 'client->publish("mqtt/unrelated", {payload: message.toBytes()}); + Error? err = 'client->close(); + if err is Error { + test:assertEquals(err.message(), "Client is connected"); + test:assertEquals(err.detail().reasonCode, 32100); + } else { + test:assertFail("Expected an error when closing without disconnecting"); + } +} + +@test:Config {enable: true} +function clientIsConnectedTest() returns error? { + Client 'client = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: mtlsConnConfig}); + string message = "Test message for checking if client is connected"; + check 'client->publish("mqtt/unrelated", {payload: message.toBytes()}); + boolean isConnected = check 'client->isConnected(); + test:assertTrue(isConnected); + check 'client->disconnect(); + isConnected = check 'client->isConnected(); + test:assertFalse(isConnected); +} + +@test:Config {enable: true} +function clientReconnectTest() returns error? { + Client 'client = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: mtlsConnConfig}); + string message = "Test message for reconnecting with the server"; + check 'client->publish("mqtt/unrelated", {payload: message.toBytes()}); + boolean isConnected = check 'client->isConnected(); + test:assertTrue(isConnected); + check 'client->disconnect(); + isConnected = check 'client->isConnected(); + test:assertFalse(isConnected); + check 'client->reconnect(); + runtime:sleep(10); + isConnected = check 'client->isConnected(); + test:assertTrue(isConnected); + check stopListenerAndClient('client = 'client); +} + +@test:Config {enable: true} +function invalidUrlClientTest() returns error? { + Client|Error result = new (INVALID_ENDPOINT, uuid:createType1AsString(), {connectionConfig: mtlsConnConfig}); + if result is Error { + test:assertEquals(result.message(), string `no NetworkModule installed for scheme "http" of URI "http://localhost:8888"`); + } else { + test:assertFail("Expected an error"); + } +} + +@test:Config {enable: true} +function invalidUrlListenerTest() returns error? { + Listener|Error result = new (INVALID_ENDPOINT, uuid:createType1AsString(), "mqtt/unrelated", {connectionConfig: mtlsConnConfig}); + if result is Error { + test:assertEquals(result.message(), string `no NetworkModule installed for scheme "http" of URI "http://localhost:8888"`); + } else { + test:assertFail("Expected an error"); + } +} + +@test:Config {enable: true} +function invalidCertPathClientTest() returns error? { + Client|Error result = new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), { + connectionConfig: { + secureSocket: { + cert: SERVER_CERT_PATH, + key: { + path: INCORRECT_KEYSTORE_PATH, + password: KEYSTORE_PASSWORD + } + } + } + }); + if result is Error { + test:assertEquals(result.message(), string `tests/resources/certsandkeys/invalid-keystore.p12 (No such file or directory)`); + } else { + test:assertFail("Expected an error"); + } +} + +@test:Config {enable: true} +function invalidCertPathListenerTest() returns error? { + Listener|Error result = new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/unrelated", { connectionConfig: { - username: "ballerina", - password: "ballerinamqtt", - connectionTimeout: 100, secureSocket: { - cert: "tests/resources/certsandkeys/ca.crt" + cert: SERVER_CERT_PATH, + key: { + path: INCORRECT_KEYSTORE_PATH, + password: KEYSTORE_PASSWORD + } } } }); - check 'client->publish("mqtt/test", {payload: "This is a test message".toBytes()}); + if result is Error { + test:assertEquals(result.message(), string `tests/resources/certsandkeys/invalid-keystore.p12 (No such file or directory)`); + } else { + test:assertFail("Expected an error"); + } +} + +@test:Config {enable: true} +function listenerGracefulStopTest() returns error? { + Listener 'listener = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/gracefulstoptopic", {connectionConfig: authMtlsConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: authMtlsConnConfig}); + string message = "Test message 1 for graceful stop"; + check 'client->publish("mqtt/gracefulstoptopic", {payload: message.toBytes()}); + runtime:sleep(1); + test:assertTrue(receivedMessages.indexOf(message) != ()); + check 'listener.gracefulStop(); + message = "Test message 2 for graceful stop"; + check 'client->publish("mqtt/gracefulstoptopic", {payload: message.toBytes()}); + test:assertTrue(receivedMessages.indexOf(message) == ()); + check stopListenerAndClient('client = 'client); +} + +@test:Config {enable: true} +function listenerImmediateStopTest() returns error? { + Listener 'listener = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/immediatestoptopic", {connectionConfig: authMtlsConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: authMtlsConnConfig}); + string message = "Test message 1 for immediate stop"; + check 'client->publish("mqtt/immediatestoptopic", {payload: message.toBytes()}); + runtime:sleep(1); + test:assertTrue(receivedMessages.indexOf(message) != ()); + check 'listener.immediateStop(); + message = "Test message 2 for immediate stop"; + check 'client->publish("mqtt/immediatestoptopic", {payload: message.toBytes()}); + test:assertTrue(receivedMessages.indexOf(message) == ()); + check stopListenerAndClient('client = 'client); +} + +@test:Config {enable: true} +function listenerDetachTest() returns error? { + Listener 'listener = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/detachtopic", {connectionConfig: authMtlsConnConfig}); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: authMtlsConnConfig}); + string message = "Test message 1 for detach"; + check 'client->publish("mqtt/detachtopic", {payload: message.toBytes()}); runtime:sleep(1); - test:assertEquals(receivedMessage, "This is a test message"); + test:assertTrue(receivedMessages.indexOf(message) != ()); + check 'listener.detach(basicService); + message = "Test message 2 for detach"; + check 'client->publish("mqtt/detachtopic", {payload: message.toBytes()}); + test:assertTrue(receivedMessages.indexOf(message) == ()); + check stopListenerAndClient('client = 'client); +} + +@test:Config {enable: true} +function serviceWithoutOnMessageTest() returns error? { + Listener 'listener = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/noonmessagetopic", {connectionConfig: mtlsConnConfig, manualAcks: true}); + string errorMessage = ""; + Service noOnMsgService = service object { + remote function onError(Error err) returns error? { + log:printError("Error occured ", err); + errorMessage = err.message(); + } + }; + check 'listener.attach(noOnMsgService); + check 'listener.'start(); + + Client 'client = check new (NO_AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), {connectionConfig: mtlsConnConfig}); + string message = "Test message for service without onmessage method"; + check 'client->publish("mqtt/noonmessagetopic", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertEquals(errorMessage, "method onMessage not found"); +} + +@test:Config {enable: true} +function clientListenerConfigTest() returns error? { + Listener 'listener = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), "mqtt/allconnconfigtopic", { + connectionConfig: { + username: AUTH_USERNAME, + password: AUTH_PASSWORD, + automaticReconnect: false, + cleanSession: true, + connectionTimeout: 10, + keepAliveInterval: 10, + maxReconnectDelay: 10, + secureSocket: { + cert: SERVER_CERT_PATH, + key: { + path: KEYSTORE_PATH, + password: KEYSTORE_PASSWORD + } + }, + serverUris: ["ssl://localhost:8889", "ssl://localhost:8890"] + }, + manualAcks: false + }); + check 'listener.attach(basicService); + check 'listener.'start(); + + Client 'client = check new (AUTH_MTLS_ENDPOINT, uuid:createType1AsString(), { + connectionConfig: { + username: AUTH_USERNAME, + password: AUTH_PASSWORD, + automaticReconnect: false, + cleanSession: true, + connectionTimeout: 10, + keepAliveInterval: 10, + maxReconnectDelay: 10, + secureSocket: { + cert: SERVER_CERT_PATH, + key: { + path: KEYSTORE_PATH, + password: KEYSTORE_PASSWORD + } + }, + serverUris: ["ssl://localhost:8889", "ssl://localhost:8890"] + } + }); + string message = "Test message for service with all connection configs"; + check 'client->publish("mqtt/allconnconfigtopic", {payload: message.toBytes()}); + runtime:sleep(1); + + check stopListenerAndClient('listener, 'client); + + test:assertTrue(receivedMessages.indexOf(message) != ()); } diff --git a/ballerina/tests/resources/certsandkeys/ca.crt b/ballerina/tests/resources/certsandkeys/ca.crt old mode 100644 new mode 100755 diff --git a/ballerina/tests/resources/certsandkeys/client-keystore.p12 b/ballerina/tests/resources/certsandkeys/client-keystore.p12 new file mode 100644 index 0000000..2dec13a Binary files /dev/null and b/ballerina/tests/resources/certsandkeys/client-keystore.p12 differ diff --git a/ballerina/tests/resources/certsandkeys/client-trustore.jks b/ballerina/tests/resources/certsandkeys/client-trustore.jks new file mode 100644 index 0000000..9be3c86 Binary files /dev/null and b/ballerina/tests/resources/certsandkeys/client-trustore.jks differ diff --git a/ballerina/tests/resources/certsandkeys/client.crt b/ballerina/tests/resources/certsandkeys/client.crt new file mode 100644 index 0000000..aee9e0c --- /dev/null +++ b/ballerina/tests/resources/certsandkeys/client.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDVzCCAj8CFEZhJ+hWb0a1ny6zZEX6twA30dhCMA0GCSqGSIb3DQEBCwUAMGgx +CzAJBgNVBAYTAkxLMRAwDgYDVQQIDAdXZXN0ZXJuMRAwDgYDVQQHDAdDb2xvbWJv +MQ0wCwYDVQQKDARXU08yMRIwEAYDVQQLDAlCYWxsZXJpbmExEjAQBgNVBAMMCWxv +Y2FsaG9zdDAeFw0yMzA3MTQwODQwNTVaFw0yODA2MTcwODQwNTVaMGgxCzAJBgNV +BAYTAkxLMRAwDgYDVQQIDAdXZXN0ZXJuMRAwDgYDVQQHDAdDb2xvbWJvMQ0wCwYD +VQQKDARXU08yMRIwEAYDVQQLDAlCYWxsZXJpbmExEjAQBgNVBAMMCWxvY2FsaG9z +dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAO2Phw+ei1PqXUmAY8ym +tki1ihsuKAxqbPU7vN2IaueWKaPaTSf/7HdaYbaSS8hfO0XQEpYcuZMEuVjLRRZI +PEhxZYwLFJOkr2j4+CFMMqMV670kVIUKe/7Pd+3jNudJaAQBNJ233uDSuuY5kF9x +DU8gSMtKXWgbc+AVGQ10hOeILgP5G0apGL9mMOeVC0Roxc8/BTaac3SDI95LHdRw +dYtwmpFgKUHxqCaffvvFpYH8+X5l7Xt+jf6cEJXCJ0t1pW4T5jnQs15PnNlOp/lx +D2JEDh3VBcDvG/AmEr0RYlUSYRHAvocpw2w05J+MBOIr36VS2Z24KM+8sOpwQmsK +RckCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEARllyzz4K2ISA/4E4TYpgscdofQ18 +jglFLY0WGnao42pT6sxj6n8Y45MdlXvU4GdY/vxSFwv59DfMDN2p7m1DXNSWgcZU +uhNrf0bib6mTbNxxuvwb/8uhGHoCabbl6QVTnjcbVzTn/SduDZFxuTqmTkCWlAKq +ZbuVdM/MqG5GdXW1/mx9MNdBvXCG3E342HJmVjA0hMRpuydN2yVhq3Ch2VdvRehJ +MKVjuzDipfrv7fcFynmaz5VuCQBnDk/Xh+4mp1IOhOkjRqwjASk2IDYjEwXGaKre +acIVeG5PZRJsFpqvq/OYbDWwaq3g4/2tqwKsmDWqRiF7TBOyk9+bBjYUzQ== +-----END CERTIFICATE----- diff --git a/ballerina/tests/resources/certsandkeys/client.key b/ballerina/tests/resources/certsandkeys/client.key new file mode 100644 index 0000000..3659fa1 --- /dev/null +++ b/ballerina/tests/resources/certsandkeys/client.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEA7Y+HD56LU+pdSYBjzKa2SLWKGy4oDGps9Tu83Yhq55Ypo9pN +J//sd1phtpJLyF87RdASlhy5kwS5WMtFFkg8SHFljAsUk6SvaPj4IUwyoxXrvSRU +hQp7/s937eM250loBAE0nbfe4NK65jmQX3ENTyBIy0pdaBtz4BUZDXSE54guA/kb +RqkYv2Yw55ULRGjFzz8FNppzdIMj3ksd1HB1i3CakWApQfGoJp9++8Wlgfz5fmXt +e36N/pwQlcInS3WlbhPmOdCzXk+c2U6n+XEPYkQOHdUFwO8b8CYSvRFiVRJhEcC+ +hynDbDTkn4wE4ivfpVLZnbgoz7yw6nBCawpFyQIDAQABAoIBAH0pbq5zCNNYmnfc +Vjh/6XBbfX0ej2kjNW7rX3GUq0aC3kN/A8G8W/jyPKM1ZwoV6qO4oeJHFhr4lTLM +PgUkX+aHIEptmBTF4sk6y3i3rcIxDXgkyQcjY/r3kida3BV1noDlpbQnKR1oyaH8 +2C0A4lM7Hj9FWfs844s/cmidaFyXZkvmjQwaVK8q+gAzodshMtWu3tb23BAOrn4j +L8TsbtewpF2l2F0hhs2mZgrbzxbZEUM8SLvKiEwVshI9jsNrzu3lzKafMKdHWC8r +KW8DwlS8oGhrVfQMbMebgcrF0UnVh9IgLDXhCj0+phu4PNRteelKLzuKJKzDFDbt +zvlL5xUCgYEA+icx1W4e8ECY/fvTZmEMMxLUkAQuFq5A4oL53sGBh+kKu0RnR6Gf +MT2hlknK6WUacwnEp8nQaD8kXO78gz4f2cqAVc6bWb9TxrafcZPZRCGO+YiMYBpn +NmOXdDaBnZBEwBuYP4911fCNPpJqDc2K0OwDqSUyS7bXmQ16SmI1YD8CgYEA8xz8 +O1mhZTpC2AuloiOn6kRqAlMkzT0o8nFcmvw3E4EkbY84/39njtdlkxkYYk8TrfpJ +W0iCh6dEpbxZhYdQfAG6w/7JUVu5Mgszl+uIrJO+v0yERowrywlAhD+F4p7OuoU2 +ZRPK+D0cV+HU/umNGpDbhrLFug08AiW1FtTrV/cCgYArcOUJkG8U/9LRwIqsq8vH +jsyVGfS7AZXJAD/OPEJ3TLQNY0qgOOQBVBeclsIDGAio05hwHu7vO4/gB6yYShCT +7MOuowt9YWl2B77WpXtEhqNimCDwi4VC/7aMeu61YxldVy1wVER9W6HbrOd38Crc +LzBPjmMWnPEB5kJHapikVwKBgQCLoLIdRDU19mg9vTK8FM5z9icMfsQ8PEwwMnuF +aCYZxaouYnvSAlJv77Ye7eLbU59x5LYM/3A4iREcPzkLP9Qx5KxntQXa/Fs5KsXP +Ey/ELnb3V/MAsbDVQ9MSIYm7xonlBm1fUCbau/5zWvkCDxFDmL78deCqIr+8W/vN +ySwBsQKBgQC7Vk2JjkLCiIccfwvubdijosJpkMVS6ib39ePXigOPOqWXAuDKfCch +E4ux1v5CnEmwDmr4mtXjLxKTOsZpevhpb5Lk9IaQLa7axUGQjmiFzZrsgiMZdCgD +624O1NycpUV0e1aib7xlp8BiD2I8YCS48YBI2tg5Zvk/2Uqqsq5IxA== +-----END RSA PRIVATE KEY----- diff --git a/ballerina/tests/resources/docker-compose.yaml b/ballerina/tests/resources/docker-compose.yaml index 9371683..af29406 100644 --- a/ballerina/tests/resources/docker-compose.yaml +++ b/ballerina/tests/resources/docker-compose.yaml @@ -7,10 +7,17 @@ services: container_name: mqtt-test-server ports: - '1883:1883' - - '9001:9001' + - '1884:1884' + - '8883:8883' + - '8884:8884' + - '8887:8887' + - '8888:8888' + - '8889:8889' + - '8890:8890' volumes: - ./mosquitto_password.txt:/mosquitto/passwd_file - ./mosquitto.conf:/mosquitto/config/mosquitto.conf - - ./certsandkeys/ca.crt:/mosquitto/ca.crt + - ./certsandkeys/client.crt:/mosquitto/ca.crt - ./certsandkeys/server.crt:/mosquitto/server.crt - ./certsandkeys/server.key:/mosquitto/server.key + - ./certsandkeys/client.crt:/mosquitto/cacerts/client.crt diff --git a/ballerina/tests/resources/mosquitto.conf b/ballerina/tests/resources/mosquitto.conf index 8518fd6..47bc121 100644 --- a/ballerina/tests/resources/mosquitto.conf +++ b/ballerina/tests/resources/mosquitto.conf @@ -1,6 +1,62 @@ +per_listener_settings true + +# MQTT - anon +listener 1883 +allow_anonymous true +set_tcp_nodelay true + +# MQTT - password +listener 1884 +password_file /mosquitto/passwd_file +set_tcp_nodelay true + +# Encrypted MQTT - No password, Client cert optional +listener 8883 +allow_anonymous true +cafile /mosquitto/ca.crt +keyfile /mosquitto/server.key +certfile /mosquitto/server.crt + +# Encrypted MQTT - No password, Client cert required +listener 8884 +allow_anonymous true +cafile /mosquitto/ca.crt +capath /mosquitto/cacerts +certfile /mosquitto/server.crt +keyfile /mosquitto/server.key +require_certificate true + +# Encrypted MQTT - No password, Server cert expired +#listener 8887 +#allow_anonymous true +#cafile /mosquitto/ca.expired.crt +#certfile /mosquitto/server.expired.crt +#keyfile /mosquitto/server.expired.key + +# Encrypted MQTT - password, Client cert required +listener 8888 +allow_anonymous false +password_file /mosquitto/passwd_file +cafile /mosquitto/ca.crt +keyfile /mosquitto/server.key +certfile /mosquitto/server.crt +require_certificate true + +# Encrypted MQTT - password, Client cert required (2nd channel) +listener 8889 +allow_anonymous false +password_file /mosquitto/passwd_file +cafile /mosquitto/ca.crt +keyfile /mosquitto/server.key +certfile /mosquitto/server.crt +require_certificate true + +# Encrypted MQTT - password, Client cert required (3rd channel) +listener 8890 allow_anonymous false password_file /mosquitto/passwd_file -listener 1883 0.0.0.0 cafile /mosquitto/ca.crt keyfile /mosquitto/server.key certfile /mosquitto/server.crt +require_certificate true + diff --git a/ballerina/tests/test_utilities.bal b/ballerina/tests/test_utilities.bal new file mode 100644 index 0000000..87e15be --- /dev/null +++ b/ballerina/tests/test_utilities.bal @@ -0,0 +1,84 @@ + +const NO_AUTH_ENDPOINT = "tcp://localhost:1883"; +const AUTH_ONLY_ENDPOINT = "tcp://localhost:1884"; +const NO_AUTH_ENCRYPTED_ENDPOINT = "ssl://localhost:8883"; +const NO_AUTH_MTLS_ENDPOINT = "ssl://localhost:8884"; +const NO_AUTH_EXPIRED_ENDPOINT = "ssl://localhost:8887"; +const AUTH_MTLS_ENDPOINT = "ssl://localhost:8888"; +const INVALID_ENDPOINT = "http://localhost:8888"; + +const AUTH_USERNAME = "ballerina"; +const AUTH_PASSWORD = "ballerinamqtt"; + +const INVALID_USERNAME = "mqttuser"; +const INVALID_PASSWORD = "password"; + +const SERVER_CERT_PATH = "tests/resources/certsandkeys/server.crt"; +const CLIENT_CERT_PATH = "tests/resources/certsandkeys/client.crt"; +const CLIENT_KEY_PATH = "tests/resources/certsandkeys/client.key"; +const KEY_PASSWORD = "ballerina"; + +const TRUSTSTORE_PATH = "tests/resources/certsandkeys/client-trustore.jks"; +const TRUSTSTORE_PASSWORD = "ballerina"; + +const KEYSTORE_PATH = "tests/resources/certsandkeys/client-keystore.p12"; +const KEYSTORE_PASSWORD = "ballerina"; + +const INCORRECT_KEYSTORE_PATH = "tests/resources/certsandkeys/invalid-keystore.p12"; +const INCORRECT_KEYSTORE_PASSWORD = "password"; + +final ConnectionConfiguration authConnConfig = { + username: AUTH_USERNAME, + password: AUTH_PASSWORD +}; + +final ConnectionConfiguration tlsConnConfig = { + secureSocket: { + cert: SERVER_CERT_PATH + } +}; + +final ConnectionConfiguration mtlsConnConfig = { + secureSocket: { + cert: SERVER_CERT_PATH, + key: { + certFile: CLIENT_CERT_PATH, + keyFile: CLIENT_KEY_PATH, + keyPassword: KEY_PASSWORD + } + } +}; + +final ConnectionConfiguration mtlsWithTrustKeyStoreConnConfig = { + secureSocket: { + cert: {path: TRUSTSTORE_PATH, password: TRUSTSTORE_PASSWORD}, + key: { + path: KEYSTORE_PATH, + password: KEYSTORE_PASSWORD + }, + protocol: {name: TLS, version: "1.2"} + } +}; + +final ConnectionConfiguration authMtlsConnConfig = { + username: AUTH_USERNAME, + password: AUTH_PASSWORD, + secureSocket: { + cert: SERVER_CERT_PATH, + key: { + certFile: CLIENT_CERT_PATH, + keyFile: CLIENT_KEY_PATH, + keyPassword: KEY_PASSWORD + } + } +}; + +function stopListenerAndClient(Listener? 'listener = (), Client? 'client = ()) returns error? { + if 'client != () { + check 'client->disconnect(); + check 'client->close(); + } + if 'listener != () { + check 'listener.gracefulStop(); + } +} diff --git a/ballerina/types.bal b/ballerina/types.bal index 32bd5ab..95dceba 100644 --- a/ballerina/types.bal +++ b/ballerina/types.bal @@ -1,3 +1,5 @@ +import ballerina/crypto; + # An MQTT message holds the application payload and other metadata. # # + payload - The payload of the message as a byte array @@ -29,6 +31,15 @@ public type ListenerConfiguration record {| boolean manualAcks = false; |}; +# An MQTTSubscription which contains the topic and the QoS level. +# +# + topic - The topic to subscribe to +# + qos - The QoS level to subscribe at +public type Subscription record {| + string topic; + int qos = 1; +|}; + # The configurations related to the connection initialization of `mqtt:Client` and `mqtt:Listener`. # # + username - The username to use for the connection @@ -36,8 +47,7 @@ public type ListenerConfiguration record {| # + secureSocket - The configurations related to secure communication with the MQTT server # + maxReconnectDelay - The maximum delay between reconnects in milliseconds # + keepAliveInterval - The maximum time interval between messages sent or received in seconds -# + maxInflight - Maximum number of messages that can be sent without receiving acknowledgments -# + connectionTimeout - Maximum time interval in seconds the client will wait for the network connection to the MQTT server to be established +# + connectionTimeout - Maximum time interval in seconds the client will wait for the network connection to the MQTT server to be established # + cleanSession - Whether the client and server should remember state for the client across reconnects # + serverUris - List of serverURIs the client may connect to # + automaticReconnect - Whether the client will automatically attempt to reconnect to the server if the connection is lost @@ -47,7 +57,6 @@ public type ConnectionConfiguration record {| SecureSocket secureSocket?; int maxReconnectDelay?; int keepAliveInterval?; - int maxInflight?; int connectionTimeout?; boolean cleanSession?; string[] serverUris?; @@ -73,9 +82,12 @@ public type DeliveryToken record {| # + key - Combination of certificate and private key of the client # + protocol - Related protocol public type SecureSocket record {| - string cert; - CertKey key?; - Protocol protocol?; + crypto:TrustStore|string cert?; + crypto:KeyStore|CertKey key?; + record {| + Protocol name; + string version; + |} protocol?; |}; # Represents a combination of certificate, private key, and private key password if encrypted. diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index 6168799..f2212cf 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -16,6 +16,12 @@ path = "../native/build/libs/mqtt-native-@project.version@.jar" # Azure dependencies [[platform.java11.dependency]] groupId = "org.eclipse.paho" -artifactId = "org.eclipse.paho.client.mqttv3" +artifactId = "org.eclipse.paho.mqttv5.client" version = "@paho.mqtt.version@" -path = "./lib/org.eclipse.paho.client.mqttv3-@paho.mqtt.version@.jar" +path = "./lib/org.eclipse.paho.mqttv5.client-@paho.mqtt.version@.jar" + +[[platform.java11.dependency]] +groupId = "org.bouncycastle" +artifactId = "bcpkix-jdk15on" +version = "@bouncy.castle.version@" +path = "./lib/bcpkix-jdk15on-@bouncy.castle.version@.jar" diff --git a/build.gradle b/build.gradle index 03f7caa..0395308 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ plugins { } ext.ballerinaLangVersion = project.ballerinaLangVersion -ext.pahoMqttVersion = project.pahoMqttVersion +ext.pahoMqtt5Version = project.pahoMqtt5Version allprojects { group = project.group diff --git a/changelog.md b/changelog.md new file mode 100644 index 0000000..e2473c8 --- /dev/null +++ b/changelog.md @@ -0,0 +1,32 @@ +# Change Log +This file contains all the notable changes done to the Ballerina MQTT package through the releases. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] +### Added +- [Add mTLS support](https://github.com/ballerina-platform/ballerina-standard-library/issues/4636) + +## [0.1.4] - 2023-07-11 +### Added +- [Add SSL/TLS support for the client and the listener](https://github.com/ballerina-platform/ballerina-standard-library/issues/4636) + +## [0.1.3] - 2023-07-06 +### Fixed +- [Make `mqtt:DEFAULT_URL` public](https://github.com/ballerina-platform/ballerina-standard-library/issues/4670) + +## [0.1.2] - 2023-07-05 +### Added +- [Fix `mqtt` name in api docs](https://github.com/ballerina-platform/ballerina-standard-library/issues/4636) + +## [0.1.1] - 2023-07-05 +### Added +- [Add API docs](https://github.com/ballerina-platform/ballerina-standard-library/issues/4636) + +## [0.1.0] - 2023-07-05 +### Added +- [Add listener client support](https://github.com/ballerina-platform/ballerina-standard-library/issues/4636) + +## [0.0.1] - 2023-07-05 +### Added +- [Add publisher client support](https://github.com/ballerina-platform/ballerina-standard-library/issues/4636) diff --git a/gradle.properties b/gradle.properties index ad725af..7ea4355 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ org.gradle.caching=true group=io.xlibb -version=0.1.5-SNAPSHOT +version=0.2.0-SNAPSHOT ballerinaLangVersion=2201.6.0 checkstylePluginVersion=8.18 @@ -10,8 +10,8 @@ downloadPluginVersion=4.0.4 releasePluginVersion=2.6.0 ballerinaGradlePluginVersion=1.0.0 -# Azure dependencies -pahoMqttVersion=1.2.5 +bouncycastleVersion=1.69 +pahoMqtt5Version=1.2.5 #stdlib dependencies diff --git a/native/build.gradle b/native/build.gradle index 5090358..86b53f3 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -10,7 +10,9 @@ dependencies { implementation group: 'org.ballerinalang', name: 'ballerina-lang', version: "${ballerinaLangVersion}" implementation group: 'org.ballerinalang', name: 'ballerina-runtime', version: "${ballerinaLangVersion}" - implementation group: 'org.eclipse.paho', name: 'org.eclipse.paho.client.mqttv3', version:"${pahoMqttVersion}" + implementation group: 'io.ballerina.stdlib', name: 'crypto-native', version: "${stdlibCryptoVersion}" + implementation group: 'org.eclipse.paho', name: 'org.eclipse.paho.mqttv5.client', version:"${pahoMqtt5Version}" + implementation group: 'org.bouncycastle', name: 'bcpkix-jdk15on', version:"${bouncycastleVersion}" } def excludePattern = '**/module-info.java' diff --git a/native/src/main/java/io/xlibb/mqtt/caller/CallerActions.java b/native/src/main/java/io/xlibb/mqtt/caller/CallerActions.java index f2550ae..7e59b7c 100644 --- a/native/src/main/java/io/xlibb/mqtt/caller/CallerActions.java +++ b/native/src/main/java/io/xlibb/mqtt/caller/CallerActions.java @@ -1,9 +1,12 @@ package io.xlibb.mqtt.caller; import io.ballerina.runtime.api.values.BObject; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.common.MqttException; +import static io.xlibb.mqtt.utils.MqttConstants.MESSAGE_ID; +import static io.xlibb.mqtt.utils.MqttConstants.QOS; +import static io.xlibb.mqtt.utils.MqttConstants.SUBSCRIBER; import static io.xlibb.mqtt.utils.MqttUtils.createMqttError; /** @@ -12,9 +15,9 @@ public class CallerActions { public static Object complete(BObject callerObject) { - IMqttClient subscriber = (IMqttClient) callerObject.getNativeData("subscriber"); - int messageId = (int) callerObject.getNativeData("messageId"); - int qos = (int) callerObject.getNativeData("qos"); + MqttClient subscriber = (MqttClient) callerObject.getNativeData(SUBSCRIBER); + int messageId = (int) callerObject.getNativeData(MESSAGE_ID); + int qos = (int) callerObject.getNativeData(QOS); try { subscriber.messageArrivedComplete(messageId, qos); } catch (MqttException e) { diff --git a/native/src/main/java/io/xlibb/mqtt/client/ClientActions.java b/native/src/main/java/io/xlibb/mqtt/client/ClientActions.java index e8c2619..ce65f73 100644 --- a/native/src/main/java/io/xlibb/mqtt/client/ClientActions.java +++ b/native/src/main/java/io/xlibb/mqtt/client/ClientActions.java @@ -6,13 +6,16 @@ import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import static io.xlibb.mqtt.utils.MqttConstants.CLIENT_OBJECT; +import static io.xlibb.mqtt.utils.MqttConstants.PAYLOAD; +import static io.xlibb.mqtt.utils.MqttConstants.QOS; +import static io.xlibb.mqtt.utils.MqttConstants.RETAINED; import static io.xlibb.mqtt.utils.MqttUtils.createMqttError; import static io.xlibb.mqtt.utils.MqttUtils.getMqttConnectOptions; @@ -24,20 +27,20 @@ public class ClientActions { public static Object externInit(BObject clientObject, BString serverUri, BString clientId, BMap clientConfiguration) { try { - IMqttClient publisher = new MqttClient(serverUri.getValue(), clientId.getValue(), new MemoryPersistence()); - MqttConnectOptions options = getMqttConnectOptions(clientConfiguration); + MqttClient publisher = new MqttClient(serverUri.getValue(), clientId.getValue(), new MemoryPersistence()); + MqttConnectionOptions options = getMqttConnectOptions(clientConfiguration); publisher.connect(options); - clientObject.addNativeData("clientObject", publisher); - } catch (MqttException e) { - return createMqttError(e); + clientObject.addNativeData(CLIENT_OBJECT, publisher); } catch (BError e) { return e; + } catch (Exception e) { + return createMqttError(e); } return null; } public static Object externPublish(BObject clientObject, BString topic, BMap message) { - IMqttClient publisher = (IMqttClient) clientObject.getNativeData("clientObject"); + MqttClient publisher = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); MqttMessage mqttMessage = generateMqttMessage(message); try { publisher.publish(topic.getValue(), mqttMessage); @@ -48,7 +51,7 @@ public static Object externPublish(BObject clientObject, BString topic, BMap mes } public static Object externClose(BObject clientObject) { - IMqttClient publisher = (IMqttClient) clientObject.getNativeData("clientObject"); + MqttClient publisher = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); try { publisher.close(); } catch (MqttException e) { @@ -58,12 +61,12 @@ public static Object externClose(BObject clientObject) { } public static Object externIsConnected(BObject clientObject) { - IMqttClient publisher = (IMqttClient) clientObject.getNativeData("clientObject"); + MqttClient publisher = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); return publisher.isConnected(); } public static Object externDisconnect(BObject clientObject) { - IMqttClient publisher = (IMqttClient) clientObject.getNativeData("clientObject"); + MqttClient publisher = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); try { publisher.disconnect(); } catch (MqttException e) { @@ -73,7 +76,7 @@ public static Object externDisconnect(BObject clientObject) { } public static Object externReconnect(BObject clientObject) { - IMqttClient publisher = (IMqttClient) clientObject.getNativeData("clientObject"); + MqttClient publisher = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); try { publisher.reconnect(); } catch (MqttException e) { @@ -84,9 +87,9 @@ public static Object externReconnect(BObject clientObject) { private static MqttMessage generateMqttMessage(BMap message) { MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setPayload(((BArray) message.get(StringUtils.fromString("payload"))).getByteArray()); - mqttMessage.setQos(((Long) message.get(StringUtils.fromString("qos"))).intValue()); - mqttMessage.setRetained(((boolean) message.get(StringUtils.fromString("retained")))); + mqttMessage.setPayload(((BArray) message.get(StringUtils.fromString(PAYLOAD))).getByteArray()); + mqttMessage.setQos(((Long) message.get(StringUtils.fromString(QOS))).intValue()); + mqttMessage.setRetained(((boolean) message.get(StringUtils.fromString(RETAINED)))); return mqttMessage; } } diff --git a/native/src/main/java/io/xlibb/mqtt/listener/ListenerActions.java b/native/src/main/java/io/xlibb/mqtt/listener/ListenerActions.java index 5261688..2073748 100644 --- a/native/src/main/java/io/xlibb/mqtt/listener/ListenerActions.java +++ b/native/src/main/java/io/xlibb/mqtt/listener/ListenerActions.java @@ -7,12 +7,17 @@ import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttSubscription; +import static io.xlibb.mqtt.utils.MqttConstants.BQOS; +import static io.xlibb.mqtt.utils.MqttConstants.CLIENT_OBJECT; +import static io.xlibb.mqtt.utils.MqttConstants.MANUAL_ACKS; +import static io.xlibb.mqtt.utils.MqttConstants.SERVICE; +import static io.xlibb.mqtt.utils.MqttConstants.TOPIC; import static io.xlibb.mqtt.utils.MqttUtils.createMqttError; import static io.xlibb.mqtt.utils.MqttUtils.getMqttConnectOptions; @@ -24,43 +29,48 @@ public class ListenerActions { public static Object externInit(BObject clientObject, BString serverUri, BString clientId, BMap listenerConfiguration) { try { - - IMqttClient subscriber = new MqttClient(serverUri.getValue(), clientId.getValue(), new MemoryPersistence()); - MqttConnectOptions options = getMqttConnectOptions(listenerConfiguration); - boolean manualAcks = listenerConfiguration.getBooleanValue(StringUtils.fromString("manualAcks")); + MqttClient subscriber = new MqttClient(serverUri.getValue(), clientId.getValue(), new MemoryPersistence()); + MqttConnectionOptions options = getMqttConnectOptions(listenerConfiguration); + boolean manualAcks = listenerConfiguration.getBooleanValue(StringUtils.fromString(MANUAL_ACKS)); subscriber.setManualAcks(manualAcks); subscriber.connect(options); - clientObject.addNativeData("clientObject", subscriber); - } catch (MqttException e) { - return createMqttError(e); + clientObject.addNativeData(CLIENT_OBJECT, subscriber); } catch (BError e) { return e; + } catch (Exception e) { + return createMqttError(e); } return null; } public static Object externAttach(Environment environment, BObject clientObject, BObject service, Object topics) { clientObject.addNativeData("service", service); - IMqttClient subscriber = (IMqttClient) clientObject.getNativeData("clientObject"); + MqttClient subscriber = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); subscriber.setCallback(new MqttCallbackImpl(environment.getRuntime(), service, subscriber)); return null; } public static Object externDetach(BObject clientObject, BObject service) { - IMqttClient subscriber = (IMqttClient) clientObject.getNativeData("clientObject"); + MqttClient subscriber = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); try { subscriber.disconnect(); } catch (MqttException e) { return createMqttError(e); } - clientObject.addNativeData("service", null); + clientObject.addNativeData(SERVICE, null); return null; } - public static Object externStart(BObject clientObject, BArray topics) { - IMqttClient subscriber = (IMqttClient) clientObject.getNativeData("clientObject"); + public static Object externStart(BObject clientObject, BArray subscriptions) { + MqttClient subscriber = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); + MqttSubscription[] mqttSubscriptions = new MqttSubscription[subscriptions.size()]; + for (int i = 0; i < subscriptions.size(); i++) { + BMap topicSubscription = (BMap) subscriptions.getValues()[i]; + mqttSubscriptions[i] = new MqttSubscription(topicSubscription.getStringValue(TOPIC).getValue(), + topicSubscription.getIntValue(BQOS).intValue()); + } try { - subscriber.subscribe(topics.getStringArray()); + subscriber.subscribe(mqttSubscriptions); } catch (MqttException e) { return createMqttError(e); } @@ -68,24 +78,24 @@ public static Object externStart(BObject clientObject, BArray topics) { } public static Object externGracefulStop(BObject clientObject) { - IMqttClient subscriber = (IMqttClient) clientObject.getNativeData("clientObject"); + MqttClient subscriber = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); try { subscriber.disconnect(); } catch (MqttException e) { return createMqttError(e); } - clientObject.addNativeData("service", null); + clientObject.addNativeData(SERVICE, null); return null; } public static Object externImmediateStop(BObject clientObject) { - IMqttClient subscriber = (IMqttClient) clientObject.getNativeData("clientObject"); + MqttClient subscriber = (MqttClient) clientObject.getNativeData(CLIENT_OBJECT); try { subscriber.disconnectForcibly(); } catch (MqttException e) { return createMqttError(e); } - clientObject.addNativeData("service", null); + clientObject.addNativeData(SERVICE, null); return null; } diff --git a/native/src/main/java/io/xlibb/mqtt/listener/MqttCallbackImpl.java b/native/src/main/java/io/xlibb/mqtt/listener/MqttCallbackImpl.java index 0ef5d0a..6a7a639 100644 --- a/native/src/main/java/io/xlibb/mqtt/listener/MqttCallbackImpl.java +++ b/native/src/main/java/io/xlibb/mqtt/listener/MqttCallbackImpl.java @@ -14,18 +14,29 @@ import io.ballerina.runtime.api.values.BString; import io.xlibb.mqtt.utils.ModuleUtils; import io.xlibb.mqtt.utils.MqttUtils; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttCallback; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; -import java.util.Arrays; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static io.xlibb.mqtt.utils.ModuleUtils.getModule; +import static io.xlibb.mqtt.utils.MqttConstants.CALLER; +import static io.xlibb.mqtt.utils.MqttConstants.DUPLICATE; +import static io.xlibb.mqtt.utils.MqttConstants.MESSAGE_ID; +import static io.xlibb.mqtt.utils.MqttConstants.ONERROR; +import static io.xlibb.mqtt.utils.MqttConstants.ONMESSAGE; +import static io.xlibb.mqtt.utils.MqttConstants.PAYLOAD; +import static io.xlibb.mqtt.utils.MqttConstants.QOS; +import static io.xlibb.mqtt.utils.MqttConstants.RECORD_MESSAGE; +import static io.xlibb.mqtt.utils.MqttConstants.RETAINED; +import static io.xlibb.mqtt.utils.MqttConstants.SUBSCRIBER; /** * Class containing the callback of Mqtt subscriber. @@ -34,17 +45,23 @@ public class MqttCallbackImpl implements MqttCallback { private final Runtime runtime; private final BObject service; - private final IMqttClient subscriber; + private final MqttClient subscriber; - public MqttCallbackImpl(Runtime runtime, BObject service, IMqttClient subscriber) { + public MqttCallbackImpl(Runtime runtime, BObject service, MqttClient subscriber) { this.runtime = runtime; this.service = service; this.subscriber = subscriber; } @Override - public void connectionLost(Throwable cause) { - BError mqttError = MqttUtils.createMqttError(cause); + public void disconnected(MqttDisconnectResponse disconnectResponse) { + BError mqttError = MqttUtils.createMqttError(disconnectResponse.getException()); + invokeOnError(mqttError); + } + + @Override + public void mqttErrorOccurred(MqttException exception) { + BError mqttError = MqttUtils.createMqttError(exception); invokeOnError(mqttError); } @@ -54,41 +71,34 @@ public void messageArrived(String topic, MqttMessage message) { } @Override - public void deliveryComplete(IMqttDeliveryToken token) { - BMap bMqttMessage; - try { - bMqttMessage = getMqttDeliveryToken(token); - } catch (MqttException e) { - BError bError = MqttUtils.createMqttError(e); - invokeOnError(bError); - return; - } - StrandMetadata metadata = getStrandMetadata("onComplete"); - CountDownLatch latch = new CountDownLatch(1); - runtime.invokeMethodAsyncSequentially(service, "onComplete", null, metadata, - new BServiceInvokeCallbackImpl(latch), null, PredefinedTypes.TYPE_ANY, bMqttMessage, true); - try { - latch.await(100, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - exception.printStackTrace(); - } - } + public void connectComplete(boolean reconnect, String serverURI) {} + + @Override + public void authPacketArrived(int reasonCode, MqttProperties properties) {} + + @Override + public void deliveryComplete(IMqttToken token) {} private void invokeOnMessage(MqttMessage message) { BMap bMqttMessage = getBMqttMessage(message); - StrandMetadata metadata = getStrandMetadata("onMessage"); + StrandMetadata metadata = getStrandMetadata(ONMESSAGE); CountDownLatch latch = new CountDownLatch(1); boolean callerExists = isCallerAvailable(); + boolean onMessageImplemented = isOnMessageImplemented(); + if (!onMessageImplemented) { + invokeOnError(MqttUtils.createMqttError(new NoSuchMethodException("method onMessage not found"))); + return; + } if (callerExists) { - BObject callerObject = ValueCreator.createObjectValue(getModule(), "Caller"); - callerObject.addNativeData("subscriber", subscriber); - callerObject.addNativeData("messageId", message.getId()); - callerObject.addNativeData("qos", message.getQos()); - runtime.invokeMethodAsyncSequentially(service, "onMessage", null, metadata, + BObject callerObject = ValueCreator.createObjectValue(getModule(), CALLER); + callerObject.addNativeData(SUBSCRIBER, subscriber); + callerObject.addNativeData(MESSAGE_ID, message.getId()); + callerObject.addNativeData(QOS, message.getQos()); + runtime.invokeMethodAsyncSequentially(service, ONMESSAGE, null, metadata, new BServiceInvokeCallbackImpl(latch), null, PredefinedTypes.TYPE_ANY, bMqttMessage, true, callerObject, true); } else { - runtime.invokeMethodAsyncSequentially(service, "onMessage", null, metadata, + runtime.invokeMethodAsyncSequentially(service, ONMESSAGE, null, metadata, new BServiceInvokeCallbackImpl(latch), null, PredefinedTypes.TYPE_ANY, bMqttMessage, true); } try { @@ -98,55 +108,58 @@ private void invokeOnMessage(MqttMessage message) { } } + private void invokeOnError(BError bError) { + boolean onErrorImplemented = isOnErrorImplemented(); + if (!onErrorImplemented) { + bError.printStackTrace(); + return; + } + StrandMetadata metadata = getStrandMetadata(ONERROR); + CountDownLatch latch = new CountDownLatch(1); + runtime.invokeMethodAsyncSequentially(service, ONERROR, null, metadata, + new BServiceInvokeCallbackImpl(latch), null, PredefinedTypes.TYPE_ANY, bError, true); + try { + latch.await(100, TimeUnit.SECONDS); + } catch (InterruptedException exception) { + exception.printStackTrace(); + } + } + + private boolean isOnErrorImplemented() { + Optional onErrorMethodType = getRemoteMethodType(ONERROR); + return onErrorMethodType.isPresent(); + } + + private boolean isOnMessageImplemented() { + Optional onMessageMethodType = getRemoteMethodType(ONMESSAGE); + return onMessageMethodType.isPresent(); + } + private boolean isCallerAvailable() { - Optional onMessageMethodType = getOnMessageMethodType(); + Optional onMessageMethodType = getRemoteMethodType(ONMESSAGE); return onMessageMethodType.isPresent() && onMessageMethodType.get().getType().getParameters().length == 2; } - private Optional getOnMessageMethodType() { + private Optional getRemoteMethodType(String methodName) { RemoteMethodType[] methodTypes = ((ServiceType) service.getOriginalType()).getRemoteMethods(); for (RemoteMethodType methodType: methodTypes) { - if (methodType.getName().equals("onMessage")) { + if (methodType.getName().equals(methodName)) { return Optional.of(methodType); } } return Optional.empty(); } - private void invokeOnError(BError bError) { - StrandMetadata metadata = getStrandMetadata("onError"); - CountDownLatch latch = new CountDownLatch(1); - runtime.invokeMethodAsyncSequentially(service, "onError", null, metadata, - new BServiceInvokeCallbackImpl(latch), null, PredefinedTypes.TYPE_ANY, bError, true); - try { - latch.await(100, TimeUnit.SECONDS); - } catch (InterruptedException exception) { - exception.printStackTrace(); - } - } - private BMap getBMqttMessage(MqttMessage message) { - BMap bMessage = ValueCreator.createRecordValue(getModule(), "Message"); - bMessage.put(StringUtils.fromString("payload"), ValueCreator.createArrayValue(message.getPayload())); - bMessage.put(StringUtils.fromString("messageId"), message.getId()); - bMessage.put(StringUtils.fromString("qos"), message.getQos()); - bMessage.put(StringUtils.fromString("retained"), message.isRetained()); - bMessage.put(StringUtils.fromString("duplicate"), message.isDuplicate()); + BMap bMessage = ValueCreator.createRecordValue(getModule(), RECORD_MESSAGE); + bMessage.put(StringUtils.fromString(PAYLOAD), ValueCreator.createArrayValue(message.getPayload())); + bMessage.put(StringUtils.fromString(MESSAGE_ID), message.getId()); + bMessage.put(StringUtils.fromString(QOS), message.getQos()); + bMessage.put(StringUtils.fromString(RETAINED), message.isRetained()); + bMessage.put(StringUtils.fromString(DUPLICATE), message.isDuplicate()); return bMessage; } - private BMap getMqttDeliveryToken(IMqttDeliveryToken token) throws MqttException { - MqttMessage mqttMessage = token.getMessage(); - BMap bMessage = getBMqttMessage(mqttMessage); - BMap bDeliveryToken = ValueCreator.createRecordValue(getModule(), "DeliveryToken"); - bDeliveryToken.put(StringUtils.fromString("message"), bMessage); - long[] qosArray = Arrays.stream(token.getGrantedQos()).asLongStream().toArray(); - bDeliveryToken.put(StringUtils.fromString("grantedQos"), ValueCreator.createArrayValue(qosArray)); - bDeliveryToken.put(StringUtils.fromString("messageId"), token.getMessageId()); - bDeliveryToken.put(StringUtils.fromString("topics"), StringUtils.fromStringArray(token.getTopics())); - return bDeliveryToken; - } - private StrandMetadata getStrandMetadata(String parentFunctionName) { Module module = ModuleUtils.getModule(); return new StrandMetadata(module.getOrg(), module.getName(), module.getMajorVersion(), parentFunctionName); diff --git a/native/src/main/java/io/xlibb/mqtt/utils/MqttConstants.java b/native/src/main/java/io/xlibb/mqtt/utils/MqttConstants.java new file mode 100644 index 0000000..6402abf --- /dev/null +++ b/native/src/main/java/io/xlibb/mqtt/utils/MqttConstants.java @@ -0,0 +1,62 @@ +package io.xlibb.mqtt.utils; + +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BString; + +/** + * Contains the constant values related in the runtime. + */ +public class MqttConstants { + + public static final BString CONNECTION_CONFIGURATION = StringUtils.fromString("connectionConfig"); + public static final BString USERNAME = StringUtils.fromString("username"); + public static final BString PASSWORD = StringUtils.fromString("password"); + public static final BString MAX_RECONNECT_DELAY = StringUtils.fromString("maxReconnectDelay"); + public static final BString KEEP_ALIVE_INTERVAL = StringUtils.fromString("keepAliveInterval"); + public static final BString CONNECTION_TIMEOUT = StringUtils.fromString("connectionTimeout"); + public static final BString CLEAN_SESSION = StringUtils.fromString("cleanSession"); + public static final BString SERVER_URIS = StringUtils.fromString("serverUris"); + public static final BString AUTOMATIC_RECONNECT = StringUtils.fromString("automaticReconnect"); + public static final BString SECURE_SOCKET = StringUtils.fromString("secureSocket"); + public static final BString CERT = StringUtils.fromString("cert"); + public static final BString KEY = StringUtils.fromString("key"); + public static final BString MESSAGE = StringUtils.fromString("message"); + public static final BString GRANTED_QOS = StringUtils.fromString("grantedQos"); + public static final BString TOPICS = StringUtils.fromString("topics"); + public static final BString TOPIC = StringUtils.fromString("topic"); + public static final BString BQOS = StringUtils.fromString("qos"); + public static final BString CERT_FILE = StringUtils.fromString("certFile"); + public static final BString KEY_FILE = StringUtils.fromString("keyFile"); + public static final BString KEY_PASSWORD = StringUtils.fromString("keyPassword"); + public static final BString KEY_STORE_PASSWORD = StringUtils.fromString("password"); + public static final BString KEY_STORE_PATH = StringUtils.fromString("path"); + public static final BString PROTOCOL_NAME = StringUtils.fromString("name"); + public static final BString PROTOCOL_VERSION = StringUtils.fromString("version"); + + public static final String ERROR_NAME = "Error"; + + public static final String CLIENT_OBJECT = "clientObject"; + public static final String SUBSCRIBER = "subscriber"; + public static final String MESSAGE_ID = "messageId"; + public static final String QOS = "qos"; + public static final String PAYLOAD = "payload"; + public static final String RETAINED = "retained"; + public static final String DUPLICATE = "duplicate"; + public static final String MANUAL_ACKS = "manualAcks"; + public static final String SERVICE = "service"; + public static final String CALLER = "Caller"; + public static final String RECORD_MESSAGE = "Message"; + public static final String RECORD_DELIVERY_TOKEN = "DeliveryToken"; + + public static final String ONCOMPLETE = "onComplete"; + public static final String ONMESSAGE = "onMessage"; + public static final String ONERROR = "onError"; + + public static final BString CRYPTO_TRUSTSTORE_PATH = StringUtils.fromString("path"); + public static final BString CRYPTO_TRUSTSTORE_PASSWORD = StringUtils.fromString("password"); + + public static final String NATIVE_DATA_PUBLIC_KEY_CERTIFICATE = "NATIVE_DATA_PUBLIC_KEY_CERTIFICATE"; + public static final String NATIVE_DATA_PRIVATE_KEY = "NATIVE_DATA_PRIVATE_KEY"; + public static final String DEFAULT_TLS_PROTOCOL = "TLSv1.2"; + +} diff --git a/native/src/main/java/io/xlibb/mqtt/utils/MqttUtils.java b/native/src/main/java/io/xlibb/mqtt/utils/MqttUtils.java index 055a95d..b617305 100644 --- a/native/src/main/java/io/xlibb/mqtt/utils/MqttUtils.java +++ b/native/src/main/java/io/xlibb/mqtt/utils/MqttUtils.java @@ -1,76 +1,101 @@ package io.xlibb.mqtt.utils; import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BArray; import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BString; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import io.ballerina.stdlib.crypto.nativeimpl.Decode; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.common.MqttException; import java.io.FileInputStream; -import java.io.IOException; -import java.security.KeyManagementException; +import java.nio.charset.StandardCharsets; import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.CertificateFactory; +import java.security.PrivateKey; +import java.security.Security; +import java.security.cert.X509Certificate; +import java.util.Objects; +import java.util.UUID; import javax.net.SocketFactory; +import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import static io.xlibb.mqtt.utils.ModuleUtils.getModule; +import static io.xlibb.mqtt.utils.MqttConstants.AUTOMATIC_RECONNECT; +import static io.xlibb.mqtt.utils.MqttConstants.CERT; +import static io.xlibb.mqtt.utils.MqttConstants.CERT_FILE; +import static io.xlibb.mqtt.utils.MqttConstants.CLEAN_SESSION; +import static io.xlibb.mqtt.utils.MqttConstants.CONNECTION_CONFIGURATION; +import static io.xlibb.mqtt.utils.MqttConstants.CONNECTION_TIMEOUT; +import static io.xlibb.mqtt.utils.MqttConstants.CRYPTO_TRUSTSTORE_PASSWORD; +import static io.xlibb.mqtt.utils.MqttConstants.CRYPTO_TRUSTSTORE_PATH; +import static io.xlibb.mqtt.utils.MqttConstants.DEFAULT_TLS_PROTOCOL; +import static io.xlibb.mqtt.utils.MqttConstants.ERROR_NAME; +import static io.xlibb.mqtt.utils.MqttConstants.KEEP_ALIVE_INTERVAL; +import static io.xlibb.mqtt.utils.MqttConstants.KEY; +import static io.xlibb.mqtt.utils.MqttConstants.KEY_FILE; +import static io.xlibb.mqtt.utils.MqttConstants.KEY_PASSWORD; +import static io.xlibb.mqtt.utils.MqttConstants.KEY_STORE_PASSWORD; +import static io.xlibb.mqtt.utils.MqttConstants.KEY_STORE_PATH; +import static io.xlibb.mqtt.utils.MqttConstants.MAX_RECONNECT_DELAY; +import static io.xlibb.mqtt.utils.MqttConstants.NATIVE_DATA_PRIVATE_KEY; +import static io.xlibb.mqtt.utils.MqttConstants.NATIVE_DATA_PUBLIC_KEY_CERTIFICATE; +import static io.xlibb.mqtt.utils.MqttConstants.PASSWORD; +import static io.xlibb.mqtt.utils.MqttConstants.PROTOCOL_NAME; +import static io.xlibb.mqtt.utils.MqttConstants.PROTOCOL_VERSION; +import static io.xlibb.mqtt.utils.MqttConstants.SECURE_SOCKET; +import static io.xlibb.mqtt.utils.MqttConstants.SERVER_URIS; +import static io.xlibb.mqtt.utils.MqttConstants.USERNAME; /** * Class containing the utility functions related to the clients. */ public class MqttUtils { - public static MqttConnectOptions getMqttConnectOptions(BMap configuration) { - MqttConnectOptions options = new MqttConnectOptions(); - Object connectionConfigObject = configuration.get(StringUtils.fromString("connectionConfig")); + public static MqttConnectionOptions getMqttConnectOptions(BMap configuration) { + MqttConnectionOptions options = new MqttConnectionOptions(); + Object connectionConfigObject = configuration.get(CONNECTION_CONFIGURATION); if (connectionConfigObject != null && connectionConfigObject instanceof BMap) { BMap connectionConfig = (BMap) connectionConfigObject; - Object username = connectionConfig.get(StringUtils.fromString("username")); + Object username = connectionConfig.get(USERNAME); if (username != null) { options.setUserName(((BString) username).getValue()); } - Object password = connectionConfig.get(StringUtils.fromString("password")); + Object password = connectionConfig.get(PASSWORD); if (password != null) { - options.setPassword(((BString) password).getValue().toCharArray()); + options.setPassword(((BString) password).getValue().getBytes(StandardCharsets.UTF_8)); } - Object maxReconnectDelay = connectionConfig.get(StringUtils.fromString("maxReconnectDelay")); + Object maxReconnectDelay = connectionConfig.get(MAX_RECONNECT_DELAY); if (maxReconnectDelay != null) { options.setMaxReconnectDelay(((Long) maxReconnectDelay).intValue()); } - Object keepAliveInterval = connectionConfig.get(StringUtils.fromString("keepAliveInterval")); + Object keepAliveInterval = connectionConfig.get(KEEP_ALIVE_INTERVAL); if (keepAliveInterval != null) { options.setKeepAliveInterval(((Long) keepAliveInterval).intValue()); } - Object maxInflight = connectionConfig.get(StringUtils.fromString("maxInflight")); - if (maxInflight != null) { - options.setMaxInflight(((Long) maxInflight).intValue()); - } - Object connectionTimeout = connectionConfig.get(StringUtils.fromString("connectionTimeout")); + Object connectionTimeout = connectionConfig.get(CONNECTION_TIMEOUT); if (connectionTimeout != null) { options.setConnectionTimeout(((Long) connectionTimeout).intValue()); } - Object cleanSession = connectionConfig.get(StringUtils.fromString("cleanSession")); + Object cleanSession = connectionConfig.get(CLEAN_SESSION); if (cleanSession != null) { - options.setCleanSession((boolean) cleanSession); + options.setCleanStart((boolean) cleanSession); } - Object serverUris = connectionConfig.get(StringUtils.fromString("serverUris")); + Object serverUris = connectionConfig.get(SERVER_URIS); if (serverUris != null) { options.setServerURIs(((BArray) serverUris).getStringArray()); } - Object automaticReconnect = connectionConfig.get(StringUtils.fromString("automaticReconnect")); + Object automaticReconnect = connectionConfig.get(AUTOMATIC_RECONNECT); if (automaticReconnect != null) { options.setAutomaticReconnect((boolean) automaticReconnect); } - Object secureSocket = connectionConfig.get(StringUtils.fromString("secureSocket")); + Object secureSocket = connectionConfig.get(SECURE_SOCKET); if (secureSocket != null) { SocketFactory socketFactory = getSocketFactory((BMap) secureSocket); options.setSocketFactory(socketFactory); @@ -80,35 +105,129 @@ public static MqttConnectOptions getMqttConnectOptions(BMap con } private static SocketFactory getSocketFactory(BMap secureSocket) { - String certPath = secureSocket.getStringValue(StringUtils.fromString("cert")).getValue(); + Object bCert = secureSocket.get(CERT); + BMap keyRecord = (BMap) secureSocket.getMapValue(KEY); + BMap protocol = secureSocket.getMapValue(PROTOCOL_NAME); + String contextProtocol = DEFAULT_TLS_PROTOCOL; + KeyManagerFactory kmf = null; + TrustManagerFactory tmf; + if (Objects.nonNull(protocol)) { + String version = protocol.getStringValue(PROTOCOL_VERSION).getValue(); + String protocolName = protocol.getStringValue(PROTOCOL_NAME).getValue(); + contextProtocol = protocolName + "v" + version; + } try { - KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - trustStore.load(null, null); - trustStore.setCertificateEntry("Custom CA", CertificateFactory.getInstance("X509") - .generateCertificate(new FileInputStream(certPath))); - - TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - tmf.init(trustStore); - TrustManager[] trustManagers = tmf.getTrustManagers(); - - SSLContext sslContext = SSLContext.getInstance("SSL"); - sslContext.init(null, trustManagers, null); + Security.addProvider(new BouncyCastleProvider()); + if (bCert instanceof BString) { + tmf = getTrustManagerFactory((BString) bCert); + } else { + BMap trustStore = (BMap) bCert; + tmf = getTrustManagerFactory(trustStore); + } + if (keyRecord != null) { + if (keyRecord.containsKey(CERT_FILE)) { + BString certFile = keyRecord.get(CERT_FILE); + BString keyFile = keyRecord.get(KEY_FILE); + BString keyPassword = keyRecord.getStringValue(KEY_PASSWORD); + kmf = getKeyManagerFactory(certFile, keyFile, keyPassword); + } else { + kmf = getKeyManagerFactory(keyRecord); + } + } + SSLContext sslContext = SSLContext.getInstance(contextProtocol); + if (Objects.nonNull(kmf)) { + sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + } else { + sslContext.init(null, tmf.getTrustManagers(), null); + } return sslContext.getSocketFactory(); - } catch (IOException | CertificateException | KeyStoreException | - NoSuchAlgorithmException | KeyManagementException e) { + } catch (Exception e) { throw createMqttError(e); } } - public static BError createMqttError(Exception exception) { - BError cause = ErrorCreator.createError(exception.getCause()); - return ErrorCreator.createError(getModule(), "Error", - StringUtils.fromString(exception.getMessage()), cause, null); + private static KeyManagerFactory getKeyManagerFactory(BMap keyStore) throws Exception { + BString keyStorePath = keyStore.getStringValue(KEY_STORE_PATH); + BString keyStorePassword = keyStore.getStringValue(KEY_STORE_PASSWORD); + KeyStore ks = getKeyStore(keyStorePath, keyStorePassword); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, keyStorePassword.getValue().toCharArray()); + return kmf; + } + + private static KeyManagerFactory getKeyManagerFactory(BString certFile, BString keyFile, BString keyPassword) + throws Exception { + Object publicKey = Decode.decodeRsaPublicKeyFromCertFile(certFile); + if (publicKey instanceof BMap) { + X509Certificate publicCert = (X509Certificate) ((BMap) publicKey).getNativeData( + NATIVE_DATA_PUBLIC_KEY_CERTIFICATE); + Object privateKeyMap = Decode.decodeRsaPrivateKeyFromKeyFile(keyFile, keyPassword); + if (privateKeyMap instanceof BMap) { + PrivateKey privateKey = (PrivateKey) ((BMap) privateKeyMap).getNativeData( + NATIVE_DATA_PRIVATE_KEY); + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(null, "".toCharArray()); + ks.setKeyEntry(UUID.randomUUID().toString(), privateKey, "".toCharArray(), + new X509Certificate[]{publicCert}); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, "".toCharArray()); + return kmf; + } else { + throw new Exception("Failed to get the private key from Crypto API. " + + ((BError) privateKeyMap).getErrorMessage().getValue()); + } + } else { + throw new Exception("Failed to get the public key from Crypto API. " + + ((BError) publicKey).getErrorMessage().getValue()); + } } - public static BError createMqttError(Throwable throwable) { - BError cause = ErrorCreator.createError(throwable); - return ErrorCreator.createError(getModule(), "Error", - StringUtils.fromString(throwable.getMessage()), cause, null); + private static TrustManagerFactory getTrustManagerFactory(BString cert) throws Exception { + Object publicKeyMap = Decode.decodeRsaPublicKeyFromCertFile(cert); + if (publicKeyMap instanceof BMap) { + X509Certificate x509Certificate = (X509Certificate) ((BMap) publicKeyMap) + .getNativeData(NATIVE_DATA_PUBLIC_KEY_CERTIFICATE); + KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType()); + ts.load(null, "".toCharArray()); + ts.setCertificateEntry(UUID.randomUUID().toString(), x509Certificate); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + return tmf; + } else { + throw new Exception("Failed to get the public key from Crypto API. " + + ((BError) publicKeyMap).getErrorMessage().getValue()); + } + } + + private static TrustManagerFactory getTrustManagerFactory(BMap trustStore) throws Exception { + BString trustStorePath = trustStore.getStringValue(CRYPTO_TRUSTSTORE_PATH); + BString trustStorePassword = trustStore.getStringValue(CRYPTO_TRUSTSTORE_PASSWORD); + KeyStore ts = getKeyStore(trustStorePath, trustStorePassword); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + return tmf; + } + + private static KeyStore getKeyStore(BString path, BString password) throws Exception { + try (FileInputStream is = new FileInputStream(path.getValue())) { + char[] passphrase = password.getValue().toCharArray(); + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(is, passphrase); + return ks; + } + } + + public static BError createMqttError(Exception exception) { + Throwable cause = exception.getCause(); + BMap errorDetailMap = ValueCreator.createRecordValue(getModule(), "ErrorDetails"); + if (exception instanceof MqttException) { + errorDetailMap.put(StringUtils.fromString("reasonCode"), ((MqttException) exception).getReasonCode()); + } + if (cause != null) { + return ErrorCreator.createError(getModule(), ERROR_NAME, StringUtils.fromString(exception.getMessage()), + ErrorCreator.createError(exception.getCause()), errorDetailMap); + } + return ErrorCreator.createError(getModule(), ERROR_NAME, StringUtils.fromString(exception.getMessage()), + null, errorDetailMap); } } diff --git a/native/src/main/java/module-info.java b/native/src/main/java/module-info.java index 3ae092b..277cad5 100644 --- a/native/src/main/java/module-info.java +++ b/native/src/main/java/module-info.java @@ -1,4 +1,7 @@ -module io.ballerina.stdlib.kafka.runtime { +module io.xlibb.mqtt { requires io.ballerina.runtime; - requires org.eclipse.paho.client.mqttv3; + requires io.ballerina.stdlib.crypto; + requires org.eclipse.paho.mqttv5.client; + requires org.bouncycastle.provider; + requires org.bouncycastle.pkix; }