Skip to content

Commit

Permalink
Add UpdateManager feature
Browse files Browse the repository at this point in the history
[#21] Integration with the Kanto's cloud connectors

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov3@bosch.io>
  • Loading branch information
dimitar-dimitrow committed Aug 15, 2023
1 parent e711217 commit cca4a26
Show file tree
Hide file tree
Showing 18 changed files with 919 additions and 46 deletions.
6 changes: 6 additions & 0 deletions NOTICE.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ golang/go (1.21.0)
* Project: https://github.com/golang/go
* Source: https://github.com/golang/go/releases/tag/go1.21.0

eclipse/ditto-clients-golang (0.0.0-20230504175246-3e6e17510ac4)

* License: Eclipse Public License v2.0
* Project: https://github.com/eclipse/ditto-clients-golang
* Source: https://github.com/eclipse-ditto/ditto-clients-golang/tree/3e6e17510ac45213afd843639e97b58669e151aa

eclipse/paho.mqtt.golang (1.4.1)

* License: Eclipse Public License - v 2.0 (EPL-2.0)
Expand Down
15 changes: 12 additions & 3 deletions cmd/update-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"log"
"os"

"github.com/eclipse-kanto/update-manager/api"
"github.com/eclipse-kanto/update-manager/cmd/update-manager/app"
"github.com/eclipse-kanto/update-manager/config"
"github.com/eclipse-kanto/update-manager/logger"
Expand All @@ -40,11 +41,19 @@ func main() {
}
defer loggerOut.Close()

client := mqtt.NewUpdateAgentClient(cfg.Domain, cfg.MQTT)
var client api.UpdateAgentClient
if cfg.ThingsEnabled {
client = mqtt.NewUpdateAgentThingsClient(cfg.Domain, cfg.MQTT)
} else {
client = mqtt.NewUpdateAgentClient(cfg.Domain, cfg.MQTT)
}
orchestrator := orchestration.NewUpdateOrchestrator(cfg)
updateManager := orchestration.NewUpdateManager(version, cfg, client, orchestrator)
updateManager, err := orchestration.NewUpdateManager(version, cfg, client, orchestrator)
if err == nil {
err = app.Launch(cfg, client, updateManager)
}

if err := app.Launch(cfg, client, updateManager); err != nil {
if err != nil {
logger.Error("failed to init Update Manager", err, nil)
loggerOut.Close()
os.Exit(1)
Expand Down
12 changes: 7 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (

// BaseConfig represents the common, reusable configuration that holds logger options and MQTT connection parameters.
type BaseConfig struct {
Log *logger.LogConfig `json:"log,omitempty"`
MQTT *mqtt.ConnectionConfig `json:"connection,omitempty"`
Domain string `json:"domain,omitempty"`
Log *logger.LogConfig `json:"log,omitempty"`
MQTT *mqtt.ConnectionConfig `json:"connection,omitempty"`
Domain string `json:"domain,omitempty"`
ThingsEnabled bool `json:"thingsEnabled,omitempty"`
}

// DefaultDomainConfig creates a new configuration filled with default values for all config properties and domain name set to the given parameter.
Expand All @@ -37,8 +38,9 @@ func DefaultDomainConfig(domain string) *BaseConfig {
LogFileCount: logFileCountDefault,
LogFileMaxAge: logFileMaxAgeDefault,
},
MQTT: mqtt.NewDefaultConfig(),
Domain: domain,
MQTT: mqtt.NewDefaultConfig(),
Domain: domain,
ThingsEnabled: true,
}
}

Expand Down
6 changes: 4 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func TestNewDefaultConfig(t *testing.T) {
SubscribeTimeout: 15000,
UnsubscribeTimeout: 5000,
},
Domain: "device",
Domain: "device",
ThingsEnabled: true,
},
Agents: agentsDefault,
RebootEnabled: true,
Expand Down Expand Up @@ -129,7 +130,8 @@ func TestLoadConfigFromFile(t *testing.T) {
SubscribeTimeout: 500,
UnsubscribeTimeout: 500,
},
Domain: "mydomain",
Domain: "mydomain",
ThingsEnabled: false,
},
Agents: expectedAgentValues,
RebootEnabled: false,
Expand Down
2 changes: 2 additions & 0 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func SetupFlags(flagSet *flag.FlagSet, cfg *BaseConfig) {
flagSet.Int64Var(&cfg.MQTT.UnsubscribeTimeout, "mqtt-conn-unsub-timeout", EnvToInt("MQTT_CONN_UNSUB_TIMEOUT", cfg.MQTT.UnsubscribeTimeout), "Unsubscribe timeout in milliseconds for the MQTT requests")

flagSet.StringVar(&cfg.Domain, "domain", EnvToString("DOMAIN", cfg.Domain), "Specify the Domain of this update agent, used as MQTT topic prefix.")

flagSet.BoolVar(&cfg.ThingsEnabled, "things-enabled", EnvToBool("THINGS_ENABLED", cfg.ThingsEnabled), "Specify whether the UpdateManager will behave as a things.")
}

// ParseConfigFilePath returns the value for configuration file path if set.
Expand Down
1 change: 1 addition & 0 deletions config/testdata/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"unsubscribeTimeout": 500
},
"domain": "mydomain",
"thingsEnabled": false,
"rebootEnabled": false,
"rebootAfter": "1m",
"reportFeedbackInterval": "2m",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
require (
github.com/BurntSushi/toml v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eclipse/ditto-clients-golang v0.0.0-20230504175246-3e6e17510ac4 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/ditto-clients-golang v0.0.0-20230504175246-3e6e17510ac4 h1:Z3jNhQFfkUmwyFv8JRnGRn3WCJ9+teLeFhh7rGHYtUo=
github.com/eclipse/ditto-clients-golang v0.0.0-20230504175246-3e6e17510ac4/go.mod h1:ey7YwfHSQJsinGkGbgeEgqZA7qJnoB0YiFVTFEY50Jg=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI=
github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
Expand Down
14 changes: 11 additions & 3 deletions mqtt/desired_state_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,20 @@ type desiredStateClient struct {
}

// NewDesiredStateClient instantiates a new client for triggering MQTT requests.
func NewDesiredStateClient(domain string, updateAgent api.UpdateAgentClient) api.DesiredStateClient {
mqttClient := updateAgent.(*updateAgentClient).mqttClient
func NewDesiredStateClient(domain string, updateAgent api.UpdateAgentClient) (api.DesiredStateClient, error) {
var mqttClient *mqttClient
switch v := updateAgent.(type) {
case *updateAgentClient:
mqttClient = updateAgent.(*updateAgentClient).mqttClient
case *updateAgentThingsClient:
mqttClient = updateAgent.(*updateAgentThingsClient).mqttClient
default:
return nil, fmt.Errorf("Unexpected type: %T", v)
}
return &desiredStateClient{
mqttClient: newInternalClient(domain, mqttClient.mqttConfig, mqttClient.pahoClient),
domain: domain,
}
}, nil
}

func (client *desiredStateClient) Domain() string {
Expand Down
61 changes: 53 additions & 8 deletions mqtt/desired_state_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ package mqtt

import (
"errors"
"fmt"
"testing"

"github.com/eclipse-kanto/update-manager/api"
"github.com/eclipse-kanto/update-manager/api/types"
mqttmocks "github.com/eclipse-kanto/update-manager/mqtt/mocks"
clientsmocks "github.com/eclipse-kanto/update-manager/mqtt/mocks"
"github.com/eclipse-kanto/update-manager/test/mocks"

pahomqtt "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -29,13 +31,56 @@ func TestDomain(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockPaho := mqttmocks.NewMockClient(mockCtrl)
mockPaho := clientsmocks.NewMockClient(mockCtrl)

updateAgentClient := &updateAgentClient{
mqttClient: newInternalClient("testDomain", &ConnectionConfig{}, mockPaho),
}

assert.Equal(t, "testDomain", NewDesiredStateClient("testDomain", updateAgentClient).Domain())
client, _ := NewDesiredStateClient("testDomain", updateAgentClient)
assert.Equal(t, "testDomain", client.Domain())
}

func TestNewDesiredStateClient(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockPaho := clientsmocks.NewMockClient(mockCtrl)
mockClient := mocks.NewMockUpdateAgentClient(mockCtrl)

tests := map[string]struct {
client api.UpdateAgentClient
err string
}{
"test_update_agent_client": {
client: &updateAgentClient{
mqttClient: newInternalClient("testDomain", &ConnectionConfig{}, mockPaho),
},
},
"test_update_agent_things_client": {
client: &updateAgentThingsClient{
updateAgentClient: &updateAgentClient{
mqttClient: newInternalClient("testDomain", &ConnectionConfig{}, mockPaho),
},
},
},
"test_error": {
client: mockClient,
err: fmt.Sprintf("Unexpected type: %T", mockClient),
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {

client, err := NewDesiredStateClient("testDomain", test.client)
if test.err != "" {
assert.EqualError(t, err, fmt.Sprintf("Unexpected type: %T", test.client))
} else {
assert.NoError(t, err)
assert.NotNil(t, client)
}
})
}
}

func TestDesiredStateClientStart(t *testing.T) {
Expand Down Expand Up @@ -118,7 +163,7 @@ func TestSendDesiredState(t *testing.T) {
{ID: test.domain},
},
}
desiredStateClient := NewDesiredStateClient(test.domain, &updateAgentClient{
desiredStateClient, _ := NewDesiredStateClient(test.domain, &updateAgentClient{
mqttClient: newInternalClient("testDomain", mqttTestConfig, mockPaho),
})
mockPaho.EXPECT().Publish(test.domain+"update/desiredstate", uint8(1), false, gomock.Any()).DoAndReturn(
Expand Down Expand Up @@ -153,7 +198,7 @@ func TestSendDesiredStateCommand(t *testing.T) {
Command: types.CommandDownload,
Baseline: name,
}
desiredStateClient := NewDesiredStateClient(test.domain, &updateAgentClient{
desiredStateClient, _ := NewDesiredStateClient(test.domain, &updateAgentClient{
mqttClient: newInternalClient(test.domain, mqttTestConfig, mockPaho),
})
mockPaho.EXPECT().Publish(test.domain+"update/desiredstate/command", uint8(1), false, gomock.Any()).DoAndReturn(
Expand Down Expand Up @@ -184,7 +229,7 @@ func TestSendCurrentStateGet(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
desiredStateClient := NewDesiredStateClient(test.domain, &updateAgentClient{
desiredStateClient, _ := NewDesiredStateClient(test.domain, &updateAgentClient{
mqttClient: newInternalClient(test.domain, mqttTestConfig, mockPaho),
})
mockPaho.EXPECT().Publish(test.domain+"update/currentstate/get", uint8(1), false, gomock.Any()).DoAndReturn(
Expand Down Expand Up @@ -213,7 +258,7 @@ func TestHandleCurrentStateMessage(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockMessage := mqttmocks.NewMockMessage(mockCtrl)
mockMessage := clientsmocks.NewMockMessage(mockCtrl)

testCurrentState := &types.Inventory{
SoftwareNodes: []*types.SoftwareNode{
Expand Down Expand Up @@ -256,7 +301,7 @@ func TestHandleDesiredStateFeedbackMessage(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockMessage := mqttmocks.NewMockMessage(mockCtrl)
mockMessage := clientsmocks.NewMockMessage(mockCtrl)

testFeedback := &types.DesiredStateFeedback{
Status: types.StatusIdentified,
Expand Down
24 changes: 12 additions & 12 deletions mqtt/update_agent_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (client *updateAgentClient) Stop() error {
return nil
}

func (client *updateAgentClient) onConnect(mqttClient pahomqtt.Client) {
go client.getAndPublishCurrentState()
func (client *updateAgentClient) onConnect(_ pahomqtt.Client) {
go getAndPublishCurrentState(client.Domain(), client.handler.HandleCurrentStateGet)

if err := client.subscribeStateTopics(); err != nil {
logger.ErrorErr(err, "[%s] error subscribing for DesiredState/DesiredStateCommand/CurrentStateGet requests", client.Domain())
Expand Down Expand Up @@ -186,16 +186,6 @@ func (client *updateAgentClient) handleStateRequest(mqttClient pahomqtt.Client,
}
}

func (client *updateAgentClient) getAndPublishCurrentState() {
currentTime := time.Now().UnixNano() / int64(time.Millisecond)
activityID := prefixInitCurrentStateID + strconv.FormatInt(int64(currentTime), 10)
if err := client.handler.HandleCurrentStateGet(activityID, currentTime); err != nil {
logger.ErrorErr(err, "[%s] error processing initial current state get request", client.Domain())
} else {
logger.Debug("[%s] initial current state get request successfully processed", client.Domain())
}
}

// SendCurrentState makes the client create envelope raw bytes with the given activityID and current state inventory and send the raw bytes as current state message.
func (client *updateAgentClient) SendCurrentState(activityID string, currentState *types.Inventory) error {
currentStateBytes, err := types.ToEnvelope(activityID, currentState)
Expand Down Expand Up @@ -254,3 +244,13 @@ func newClient(config *ConnectionConfig, onConnect pahomqtt.OnConnectHandler) pa

return pahomqtt.NewClient(clientOptions)
}

func getAndPublishCurrentState(domain string, currentStateGetHandler func(string, int64) error) {
currentTime := time.Now().UnixNano() / int64(time.Millisecond)
activityID := prefixInitCurrentStateID + strconv.FormatInt(int64(currentTime), 10)
if err := currentStateGetHandler(activityID, currentTime); err != nil {
logger.ErrorErr(err, "[%s] error processing initial current state get request", domain)
} else {
logger.Debug("[%s] initial current state get request successfully processed", domain)
}
}
Loading

0 comments on commit cca4a26

Please sign in to comment.