Skip to content

Commit

Permalink
Align update manager MQTT timeout configurations (#41)
Browse files Browse the repository at this point in the history
[#7] Align update manager configuration with container management configuration

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov3@bosch.io>
  • Loading branch information
dimitar-dimitrow authored Aug 30, 2023
1 parent 5b9e85e commit c8d40ac
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 110 deletions.
24 changes: 12 additions & 12 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func TestNewDefaultConfig(t *testing.T) {
},
MQTT: &mqtt.ConnectionConfig{
Broker: "tcp://localhost:1883",
KeepAlive: 20000,
DisconnectTimeout: 250,
KeepAlive: "20s",
DisconnectTimeout: "250ms",
Username: "",
Password: "",
ConnectTimeout: 30000,
AcknowledgeTimeout: 15000,
SubscribeTimeout: 15000,
UnsubscribeTimeout: 5000,
ConnectTimeout: "30s",
AcknowledgeTimeout: "15s",
SubscribeTimeout: "15s",
UnsubscribeTimeout: "5s",
},
Domain: "device",
ThingsEnabled: true,
Expand Down Expand Up @@ -118,14 +118,14 @@ func TestLoadConfigFromFile(t *testing.T) {
},
MQTT: &mqtt.ConnectionConfig{
Broker: "www",
KeepAlive: 500,
DisconnectTimeout: 500,
KeepAlive: "500ms",
DisconnectTimeout: "500ms",
Username: "username",
Password: "pass",
ConnectTimeout: 500,
AcknowledgeTimeout: 500,
SubscribeTimeout: 500,
UnsubscribeTimeout: 500,
ConnectTimeout: "500ms",
AcknowledgeTimeout: "500ms",
SubscribeTimeout: "500ms",
UnsubscribeTimeout: "500ms",
},
Domain: "mydomain",
ThingsEnabled: false,
Expand Down
12 changes: 6 additions & 6 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func SetupFlags(flagSet *flag.FlagSet, cfg *BaseConfig) {

// init connection flags
flagSet.StringVar(&cfg.MQTT.Broker, "mqtt-conn-broker", EnvToString("MQTT_CONN_BROKER", cfg.MQTT.Broker), "Address of the MQTT server/broker that the update manager will connect for the communication, the format is: scheme://host:port")
flagSet.Int64Var(&cfg.MQTT.KeepAlive, "mqtt-conn-keep-alive", EnvToInt("MQTT_CONN_KEEP_ALIVE", cfg.MQTT.KeepAlive), "Keep alive duration in milliseconds for the MQTT requests")
flagSet.Int64Var(&cfg.MQTT.DisconnectTimeout, "mqtt-conn-disconnect-timeout", EnvToInt("MQTT_CONN_DISCONNECT_TIMEOUT", cfg.MQTT.DisconnectTimeout), "Disconnect timeout in milliseconds for the MQTT server/broker")
flagSet.StringVar(&cfg.MQTT.KeepAlive, "mqtt-conn-keep-alive", EnvToString("MQTT_CONN_KEEP_ALIVE", cfg.MQTT.KeepAlive), "Keep alive duration for the MQTT requests as duration string")
flagSet.StringVar(&cfg.MQTT.DisconnectTimeout, "mqtt-conn-disconnect-timeout", EnvToString("MQTT_CONN_DISCONNECT_TIMEOUT", cfg.MQTT.DisconnectTimeout), "Disconnect timeout for the MQTT server/broker as duration string")
flagSet.StringVar(&cfg.MQTT.Username, "mqtt-conn-username", EnvToString("MQTT_CONN_USERNAME", cfg.MQTT.Username), "Username that is a part of the credentials")
flagSet.StringVar(&cfg.MQTT.Password, "mqtt-conn-password", EnvToString("MQTT_CONN_PASSWORD", cfg.MQTT.Password), "Password that is a part of the credentials")
flagSet.Int64Var(&cfg.MQTT.ConnectTimeout, "mqtt-conn-connect-timeout", EnvToInt("MQTT_CONN_CONNECT_TIMEOUT", cfg.MQTT.ConnectTimeout), "Connect timeout in milliseconds for the MQTT server/broker")
flagSet.Int64Var(&cfg.MQTT.AcknowledgeTimeout, "mqtt-conn-ack-timeout", EnvToInt("MQTT_CONN_ACK_TIMEOUT", cfg.MQTT.AcknowledgeTimeout), "Acknowledge timeout in milliseconds for the MQTT requests")
flagSet.Int64Var(&cfg.MQTT.SubscribeTimeout, "mqtt-conn-sub-timeout", EnvToInt("MQTT_CONN_SUB_TIMEOUT", cfg.MQTT.SubscribeTimeout), "Subscribe timeout in milliseconds for the MQTT requests")
flagSet.Int64Var(&cfg.MQTT.UnsubscribeTimeout, "mqtt-conn-unsub-timeout", EnvToInt("MQTT_CONN_UNSUB_TIMEOUT", cfg.MQTT.UnsubscribeTimeout), "Unsubscribe timeout in milliseconds for the MQTT requests")
flagSet.StringVar(&cfg.MQTT.ConnectTimeout, "mqtt-conn-connect-timeout", EnvToString("MQTT_CONN_CONNECT_TIMEOUT", cfg.MQTT.ConnectTimeout), "Connect timeout for the MQTT server/broker as duration string")
flagSet.StringVar(&cfg.MQTT.AcknowledgeTimeout, "mqtt-conn-ack-timeout", EnvToString("MQTT_CONN_ACK_TIMEOUT", cfg.MQTT.AcknowledgeTimeout), "Acknowledge timeout for the MQTT requests as duration string")
flagSet.StringVar(&cfg.MQTT.SubscribeTimeout, "mqtt-conn-sub-timeout", EnvToString("MQTT_CONN_SUB_TIMEOUT", cfg.MQTT.SubscribeTimeout), "Subscribe timeout for the MQTT requests as duration string")
flagSet.StringVar(&cfg.MQTT.UnsubscribeTimeout, "mqtt-conn-unsub-timeout", EnvToString("MQTT_CONN_UNSUB_TIMEOUT", cfg.MQTT.UnsubscribeTimeout), "Unsubscribe timeout for the MQTT requests as duration string")

flagSet.StringVar(&cfg.Domain, "domain", EnvToString("DOMAIN", cfg.Domain), "Specify the Domain of this update agent, used as MQTT topic prefix.")

Expand Down
12 changes: 6 additions & 6 deletions config/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func TestSetupFlags(t *testing.T) {
},
"test_flags_mqtt_conn_keep_alive": {
flag: "mqtt-conn-keep-alive",
expectedType: reflect.Int.String(),
expectedType: reflect.String.String(),
},
"test_flags_mqtt_conn_disconnect_timeout": {
flag: "mqtt-conn-disconnect-timeout",
expectedType: reflect.Int.String(),
expectedType: reflect.String.String(),
},
"test_flags_mqtt_conn_username": {
flag: "mqtt-conn-username",
Expand All @@ -75,19 +75,19 @@ func TestSetupFlags(t *testing.T) {
},
"test_flags_mqtt-conn-connect-timeout": {
flag: "mqtt-conn-connect-timeout",
expectedType: reflect.Int.String(),
expectedType: reflect.String.String(),
},
"test_flags_mqtt-conn-ack-timeout": {
flag: "mqtt-conn-ack-timeout",
expectedType: reflect.Int.String(),
expectedType: reflect.String.String(),
},
"test_flags_mqtt-conn-sub-timeout": {
flag: "mqtt-conn-sub-timeout",
expectedType: reflect.Int.String(),
expectedType: reflect.String.String(),
},
"test_flags_mqtt-conn-unsub-timeout": {
flag: "mqtt-conn-unsub-timeout",
expectedType: reflect.Int.String(),
expectedType: reflect.String.String(),
},
"test_flags_domain": {
flag: "domain",
Expand Down
12 changes: 6 additions & 6 deletions config/testdata/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
},
"connection": {
"broker":"www",
"keepAlive": 500,
"disconnectTimeout": 500,
"keepAlive": "500ms",
"disconnectTimeout": "500ms",
"username":"username",
"password":"pass",
"connectTimeout": 500,
"acknowledgeTimeout": 500,
"subscribeTimeout": 500,
"unsubscribeTimeout": 500
"connectTimeout": "500ms",
"acknowledgeTimeout": "500ms",
"subscribeTimeout": "500ms",
"unsubscribeTimeout": "500ms"
},
"domain": "mydomain",
"thingsEnabled": false,
Expand Down
24 changes: 12 additions & 12 deletions mqtt/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@ package mqtt
const (
// default mqtt connection config
defaultBroker = "tcp://localhost:1883"
defaultKeepAlive = 20000
defaultDisconnectTimeout = 250
defaultKeepAlive = "20s"
defaultDisconnectTimeout = "250ms"
defaultUsername = ""
defaultPassword = ""
defaultConnectTimeout = 30000
defaultAcknowledgeTimeout = 15000
defaultSubscribeTimeout = 15000
defaultUnsubscribeTimeout = 5000
defaultConnectTimeout = "30s"
defaultAcknowledgeTimeout = "15s"
defaultSubscribeTimeout = "15s"
defaultUnsubscribeTimeout = "5s"
)

// ConnectionConfig represents the mqtt client connection config
type ConnectionConfig struct {
Broker string `json:"broker,omitempty"`
KeepAlive int64 `json:"keepAlive,omitempty"`
DisconnectTimeout int64 `json:"disconnectTimeout,omitempty"`
KeepAlive string `json:"keepAlive,omitempty"`
DisconnectTimeout string `json:"disconnectTimeout,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
ConnectTimeout int64 `json:"connectTimeout,omitempty"`
AcknowledgeTimeout int64 `json:"acknowledgeTimeout,omitempty"`
SubscribeTimeout int64 `json:"subscribeTimeout,omitempty"`
UnsubscribeTimeout int64 `json:"unsubscribeTimeout,omitempty"`
ConnectTimeout string `json:"connectTimeout,omitempty"`
AcknowledgeTimeout string `json:"acknowledgeTimeout,omitempty"`
SubscribeTimeout string `json:"subscribeTimeout,omitempty"`
UnsubscribeTimeout string `json:"unsubscribeTimeout,omitempty"`
}

// NewDefaultConfig returns a default mqtt client connection config instance
Expand Down
35 changes: 20 additions & 15 deletions mqtt/desired_state_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ package mqtt

import (
"fmt"
"time"

"github.com/eclipse-kanto/update-manager/api"
"github.com/eclipse-kanto/update-manager/api/types"
"github.com/eclipse-kanto/update-manager/api/util"
"github.com/eclipse-kanto/update-manager/logger"

pahomqtt "github.com/eclipse/paho.mqtt.golang"
Expand Down Expand Up @@ -76,20 +78,18 @@ func (client *desiredStateClient) subscribe() error {
topicFilters[client.topicCurrentState] = 1
topicFilters[client.topicDesiredStateFeedback] = 1
logger.Debug("subscribing for '%v' topics", topicFilters)
subscribeTimeout := convertToMilliseconds(client.mqttConfig.SubscribeTimeout)
token := client.pahoClient.SubscribeMultiple(topicFilters, client.handleMessage)
if !token.WaitTimeout(subscribeTimeout) {
return fmt.Errorf("cannot subscribe for topics '%v' in '%v' seconds", topicFilters, subscribeTimeout)
if !token.WaitTimeout(client.mqttConfig.SubscribeTimeout) {
return fmt.Errorf("cannot subscribe for topics '%v' in '%v'", topicFilters, client.mqttConfig.SubscribeTimeout)
}
return token.Error()
}

func (client *desiredStateClient) unsubscribe() error {
logger.Debug("unsubscribing from '%s' & '%s' topics", client.topicCurrentState, client.topicDesiredStateFeedback)
token := client.pahoClient.Unsubscribe(client.topicCurrentState, client.topicDesiredStateFeedback)
unsubscribeTimeout := convertToMilliseconds(client.mqttConfig.UnsubscribeTimeout)
if !token.WaitTimeout(unsubscribeTimeout) {
return fmt.Errorf("cannot unsubscribe from topic '%s' & '%s' in '%v' seconds", client.topicCurrentState, client.topicDesiredStateFeedback, unsubscribeTimeout)
if !token.WaitTimeout(client.mqttConfig.UnsubscribeTimeout) {
return fmt.Errorf("cannot unsubscribe from topic '%s' & '%s' in '%v'", client.topicCurrentState, client.topicDesiredStateFeedback, client.mqttConfig.UnsubscribeTimeout)
}
return token.Error()
}
Expand Down Expand Up @@ -127,9 +127,8 @@ func (client *desiredStateClient) SendDesiredState(activityID string, desiredSta
return errors.Wrapf(err, "cannot marshal desired state message for activity-id %s", activityID)
}
token := client.pahoClient.Publish(client.topicDesiredState, 1, false, desiredStateBytes)
acknowledgeTimeout := convertToMilliseconds(client.mqttConfig.AcknowledgeTimeout)
if !token.WaitTimeout(acknowledgeTimeout) {
return fmt.Errorf("cannot publish to topic '%s' in '%v' seconds", client.topicDesiredState, acknowledgeTimeout)
if !token.WaitTimeout(client.mqttConfig.AcknowledgeTimeout) {
return fmt.Errorf("cannot publish to topic '%s' in '%v'", client.topicDesiredState, client.mqttConfig.AcknowledgeTimeout)
}
return token.Error()
}
Expand All @@ -141,9 +140,8 @@ func (client *desiredStateClient) SendDesiredStateCommand(activityID string, des
return errors.Wrapf(err, "cannot marshal desired state command message for activity-id %s", activityID)
}
token := client.pahoClient.Publish(client.topicDesiredStateCommand, 1, false, desiredStateCommandBytes)
acknowledgeTimeout := convertToMilliseconds(client.mqttConfig.AcknowledgeTimeout)
if !token.WaitTimeout(acknowledgeTimeout) {
return fmt.Errorf("cannot publish to topic '%s' in '%v' seconds", client.topicDesiredStateCommand, acknowledgeTimeout)
if !token.WaitTimeout(client.mqttConfig.AcknowledgeTimeout) {
return fmt.Errorf("cannot publish to topic '%s' in '%v'", client.topicDesiredStateCommand, client.mqttConfig.AcknowledgeTimeout)
}
return token.Error()
}
Expand All @@ -155,9 +153,16 @@ func (client *desiredStateClient) SendCurrentStateGet(activityID string) error {
return errors.Wrapf(err, "cannot marshal current state get message for activity-id %s", activityID)
}
token := client.pahoClient.Publish(client.topicCurrentStateGet, 1, false, currentStateGetBytes)
acknowledgeTimeout := convertToMilliseconds(client.mqttConfig.AcknowledgeTimeout)
if !token.WaitTimeout(acknowledgeTimeout) {
return fmt.Errorf("cannot publish to topic '%s' in '%v' seconds", client.topicCurrentStateGet, acknowledgeTimeout)
if !token.WaitTimeout(client.mqttConfig.AcknowledgeTimeout) {
return fmt.Errorf("cannot publish to topic '%s' in '%v'", client.topicCurrentStateGet, client.mqttConfig.AcknowledgeTimeout)
}
return token.Error()
}

func parseDuration(property, value, defaultValue string) time.Duration {
defaultDuration, err := time.ParseDuration(defaultValue)
if err != nil {
logger.Warn("Cannot parse default duration for property '%s': %s", property, defaultValue)
}
return util.ParseDuration(property, value, defaultDuration, defaultDuration)
}
10 changes: 5 additions & 5 deletions mqtt/desired_state_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestDomain(t *testing.T) {
mockPaho := clientsmocks.NewMockClient(mockCtrl)

updateAgentClient := &updateAgentClient{
mqttClient: newInternalClient("testDomain", &ConnectionConfig{}, mockPaho),
mqttClient: newInternalClient("testDomain", &internalConnectionConfig{}, mockPaho),
}

client, _ := NewDesiredStateClient("testDomain", updateAgentClient)
Expand All @@ -54,13 +54,13 @@ func TestNewDesiredStateClient(t *testing.T) {
}{
"test_update_agent_client": {
client: &updateAgentClient{
mqttClient: newInternalClient("testDomain", &ConnectionConfig{}, mockPaho),
mqttClient: newInternalClient("testDomain", &internalConnectionConfig{}, mockPaho),
},
},
"test_update_agent_things_client": {
client: &updateAgentThingsClient{
updateAgentClient: &updateAgentClient{
mqttClient: newInternalClient("testDomain", &ConnectionConfig{}, mockPaho),
mqttClient: newInternalClient("testDomain", &internalConnectionConfig{}, mockPaho),
},
},
},
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestHandleCurrentStateMessage(t *testing.T) {
stateHandler.EXPECT().HandleCurrentState(name, gomock.Any(), testCurrentState).Times(expectedCalls).Return(test.handlerError)

desiredStateClient := &desiredStateClient{
mqttClient: newInternalClient(test.domain, &ConnectionConfig{}, nil),
mqttClient: newInternalClient(test.domain, &internalConnectionConfig{}, nil),
domain: test.domain,
stateHandler: stateHandler,
}
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestHandleDesiredStateFeedbackMessage(t *testing.T) {
stateHandler.EXPECT().HandleDesiredStateFeedback(name, gomock.Any(), testFeedback).Times(expectedCalls).Return(test.handlerError)

desiredStateClient := &desiredStateClient{
mqttClient: newInternalClient(test.domain, &ConnectionConfig{}, nil),
mqttClient: newInternalClient(test.domain, &internalConnectionConfig{}, nil),
domain: test.domain,
stateHandler: stateHandler,
}
Expand Down
Loading

0 comments on commit c8d40ac

Please sign in to comment.