Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added app instance #18

Merged
merged 3 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 83 additions & 63 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
)

const (
redeemPath = "/idm/api/v1/appregistry/otp/redeem"
unlinkPath = "/idm/api/v1/appregistry/applications/%s/tenants/%s"
getDevicesPath = "/api/uno/v1/registry/devices"
directModePath = "/api/dxhub/v2/apiproxy/request/%s/direct%s"
newAppInstancePath = "/idm/api/v1/appregistry/otp/new"
redeemPath = "/idm/api/v1/appregistry/otp/redeem"
unlinkPath = "/idm/api/v1/appregistry/applications/%s/tenants/%s"
getDevicesPath = "/api/uno/v1/registry/devices"
directModePath = "/api/dxhub/v2/apiproxy/request/%s/direct%s"
)

var tlsConfig = tls.Config{
Expand Down Expand Up @@ -82,8 +83,12 @@ type Config struct {
Transport *http.Transport

// GetCredentials is used to retrieve the client credentials provided to the app during onboarding
// Either use this or ApiKey
GetCredentials func() (*Credentials, error)

// ApiKey is used when GetCredentials is not specified
ApiKey string

// DeviceActivationHandler notifies when a device is activated
DeviceActivationHandler func(device *Device)

Expand All @@ -105,10 +110,11 @@ type App struct {
deviceMap sync.Map

// Error channel should be used to monitor any errors
Error chan error
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
Error chan error
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
startPubsubConnectOnce sync.Once
}

func (app *App) String() string {
Expand All @@ -131,13 +137,18 @@ func New(config Config) (*App, error) {
httpClient := resty.New().
SetBaseURL(hostURL.String()).
OnBeforeRequest(func(_ *resty.Client, request *resty.Request) error {
credentials, err := config.GetCredentials()
if err != nil {
return err
var key string
if config.GetCredentials != nil {
credentials, err := config.GetCredentials()
if err != nil {
return err
}
key = string(credentials.ApiKey)
zeroByteArray(credentials.ApiKey)
} else {
key = config.ApiKey
}

request.SetHeader("X-Api-Key", string(credentials.ApiKey))
zeroByteArray(credentials.ApiKey)
request.SetHeader("X-Api-Key", key)
return nil
})

Expand All @@ -157,48 +168,6 @@ func New(config Config) (*App, error) {
}

app.ctx, app.ctxCancel = context.WithCancel(context.Background())

app.wg.Add(1)
go func() {
defer app.wg.Done()
backoffFactor := 0
maxbackoffFactor := 3
reconnectBackoff := 30 * time.Second
reconnectDelay := 30 * time.Second

//loop to call app.connect with a reconnect delay with gradual backoff
for {
err := app.connect("")
if err != nil {
log.Logger.Errorf("Failed to connect the app: %v", err)
app.close()
app.reportError(err)
} else {
//obtain the device list
app.loadTenantsDevices()
//reset backoff factor for successful connection
backoffFactor = 0

select {
case err = <-app.conn.Error:
app.close()
app.reportError(err)
case <-app.ctx.Done():
return
}
}

select {
case <-time.After(reconnectDelay + reconnectBackoff*time.Duration(backoffFactor)):
//increment backoff factor by 1 for gradual backoff
if backoffFactor < maxbackoffFactor {
backoffFactor += 1
}
case <-app.ctx.Done():
return
}
}
}()
return app, nil
}

Expand Down Expand Up @@ -279,19 +248,22 @@ func (app *App) close() error {
return nil
}

// connect opens a websocket connection to pxGrid Cloud
// WORKAROUND provide an option to use previous subscription ID
func (app *App) connect(subscriptionID string) error {
// pubsubConnect opens a websocket connection to pxGrid Cloud
func (app *App) pubsubConnect() error {
var err error
app.conn, err = pubsub.NewConnection(pubsub.Config{
GroupID: app.config.GroupID,
Domain: url.PathEscape(app.config.RegionalFQDN),
APIKeyProvider: func() ([]byte, error) {
credentials, e := app.config.GetCredentials()
if e != nil {
return nil, e
if app.config.GetCredentials != nil {
credentials, e := app.config.GetCredentials()
if e != nil {
return nil, e
}
return credentials.ApiKey, e
} else {
return []byte(app.config.ApiKey), nil
}
return credentials.ApiKey, e
},
Transport: app.config.Transport,
})
Expand Down Expand Up @@ -566,6 +538,8 @@ func (app *App) setTenant(tenant *Tenant) error {
}
app.deviceMap.Store(tenant.id, &deviceMapInternal)

// Once a tenant is added, we can start pubsub
app.startPubsubConnect()
return nil
}

Expand All @@ -592,3 +566,49 @@ func (app *App) loadTenantsDevices() {
return true
})
}

func (app *App) startPubsubConnect() {
app.startPubsubConnectOnce.Do(func() {
app.wg.Add(1)
go func() {
defer app.wg.Done()
backoffFactor := 0
maxbackoffFactor := 3
reconnectBackoff := 30 * time.Second
reconnectDelay := 30 * time.Second

//loop to call app.connect with a reconnect delay with gradual backoff
for {
err := app.pubsubConnect()
if err != nil {
log.Logger.Errorf("Failed to connect the app: %v", err)
app.close()
app.reportError(err)
} else {
//obtain the device list
app.loadTenantsDevices()
//reset backoff factor for successful connection
backoffFactor = 0

select {
case err = <-app.conn.Error:
app.close()
app.reportError(err)
case <-app.ctx.Done():
return
}
}

select {
case <-time.After(reconnectDelay + reconnectBackoff*time.Duration(backoffFactor)):
//increment backoff factor by 1 for gradual backoff
if backoffFactor < maxbackoffFactor {
backoffFactor += 1
}
case <-app.ctx.Done():
return
}
}
}()
})
}
10 changes: 8 additions & 2 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
Expand Down Expand Up @@ -57,14 +58,15 @@ var (
type AppTestSuite struct {
suite.Suite
config Config
s *httptest.Server
}

func (suite *AppTestSuite) SetupTest() {
s := test.NewRPCServer(suite.T(), test.Config{
suite.s = test.NewRPCServer(suite.T(), test.Config{
PubSubPath: apiPaths.pubsub,
SubscriptionsPath: apiPaths.subscriptions,
})
u, _ := url.Parse(s.URL)
u, _ := url.Parse(suite.s.URL)

suite.config = Config{
ID: "appId",
Expand All @@ -89,6 +91,10 @@ func (suite *AppTestSuite) SetupTest() {
}
}

func (suite *AppTestSuite) TearDownTest() {
suite.s.Close()
}

func (suite *AppTestSuite) TestNew() {
app, err := New(suite.config)
suite.Nil(err)
Expand Down
92 changes: 92 additions & 0 deletions appinstance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package cloud

import (
"errors"

"github.com/rs/xid"
)

type appInstanceRequest struct {
OTP string `json:"otp"`
ParentAppID string `json:"parent_application_id"`
InstanceID string `json:"instance_identifier"`
InstanceName string `json:"instance_name"`
}

type appInstanceResponse struct {
AppID string `json:"app_id"`
AppApiKey string `json:"app_api_key"`
TenantToken string `json:"api_token"`
TenantID string `json:"tenant_id"`
TenantName string `json:"tenant_name"`
}

// LinkTenantWithNewAppInstance redeems the OTP and links a tenant to a new application instance.
// InstanceName will be shown in UI to signify the new application instance.
// The returned App.ID(), App.ApiKey(), Tenant.ID(), Tenant.Name() and Tenant.ApiToken() must be stored securely.
func (app *App) LinkTenantWithNewAppInstance(otp, instanceName string) (*App, *Tenant, error) {
appReq := appInstanceRequest{
OTP: otp,
ParentAppID: app.config.ID,
InstanceID: xid.New().String(),
InstanceName: instanceName,
}

var appResp appInstanceResponse
var errorResp errorResponse

r, err := app.httpClient.R().
SetBody(appReq).
SetResult(&appResp).
SetError(&errorResp).
Post(newAppInstancePath)
if err != nil {
return nil, nil, err
}
if r.IsError() {
return nil, nil, errors.New(errorResp.GetError())
}

appConfig := app.newAppConfig(appResp.AppID, appResp.AppApiKey)

childApp, err := New(appConfig)
if err != nil {
return nil, nil, err
}
tenant, err := childApp.SetTenant(appResp.TenantID, appResp.TenantName, appResp.TenantToken)
if err != nil {
return nil, nil, err
}
return childApp, tenant, nil
}

// SetAppInstance adds an application instance.
// The appID and appApiKey are obtained from calling LinkTenantWithNewAppInstance.
// This should be used by application after restart to reload application instances
func (app *App) SetAppInstance(appID, appApiKey string) (*App, error) {
config := app.newAppConfig(appID, appApiKey)
return New(config)
}

func (app *App) newAppConfig(appID, appApiKey string) Config {
return Config{
ID: appID,
GlobalFQDN: app.config.GlobalFQDN,
RegionalFQDN: app.config.RegionalFQDN,
ReadStreamID: "app--" + appID + "-R",
WriteStreamID: "app--" + appID + "-W",
Transport: app.config.Transport,
ApiKey: appApiKey,
DeviceActivationHandler: app.config.DeviceActivationHandler,
DeviceDeactivationHandler: app.config.DeviceDeactivationHandler,
DeviceMessageHandler: app.config.DeviceMessageHandler,
}
}

func (app *App) ID() string {
return app.config.ID
}

func (app *App) ApiKey() string {
return app.config.ApiKey
}
18 changes: 18 additions & 0 deletions examples/multi-instance/config_sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
app:
id: replace_with_app_id
apiKey: replace_with_api_key
globalFQDN: dnaservices.cisco.com
regionalFQDN: neoffers.cisco.com
readStream: replace_with_read_stream
writeStream: replace_with_write_stream

appInstance:
otp: replace_with_otp_from_dna_cloud
name: Instance 001
# config file will be rewritten with id, key after using OTP
id:
apiKey:
tenant:
id:
name:
token:
Loading