Skip to content

Commit

Permalink
Added app instance (#18)
Browse files Browse the repository at this point in the history
Added app instance
  • Loading branch information
alei121 authored Feb 21, 2023
1 parent 654f13f commit 92a3602
Show file tree
Hide file tree
Showing 20 changed files with 1,095 additions and 104 deletions.
146 changes: 83 additions & 63 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
)

const (
redeemPath = "/idm/api/v1/appregistry/otp/redeem"
unlinkPath = "/idm/api/v1/appregistry/applications/%s/tenants/%s"
getDevicesPath = "/api/uno/v1/registry/devices"
directModePath = "/api/dxhub/v2/apiproxy/request/%s/direct%s"
newAppInstancePath = "/idm/api/v1/appregistry/otp/new"
redeemPath = "/idm/api/v1/appregistry/otp/redeem"
unlinkPath = "/idm/api/v1/appregistry/applications/%s/tenants/%s"
getDevicesPath = "/api/uno/v1/registry/devices"
directModePath = "/api/dxhub/v2/apiproxy/request/%s/direct%s"
)

var tlsConfig = tls.Config{
Expand Down Expand Up @@ -82,8 +83,12 @@ type Config struct {
Transport *http.Transport

// GetCredentials is used to retrieve the client credentials provided to the app during onboarding
// Either use this or ApiKey
GetCredentials func() (*Credentials, error)

// ApiKey is used when GetCredentials is not specified
ApiKey string

// DeviceActivationHandler notifies when a device is activated
DeviceActivationHandler func(device *Device)

Expand All @@ -105,10 +110,11 @@ type App struct {
deviceMap sync.Map

// Error channel should be used to monitor any errors
Error chan error
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
Error chan error
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
startPubsubConnectOnce sync.Once
}

func (app *App) String() string {
Expand All @@ -131,13 +137,18 @@ func New(config Config) (*App, error) {
httpClient := resty.New().
SetBaseURL(hostURL.String()).
OnBeforeRequest(func(_ *resty.Client, request *resty.Request) error {
credentials, err := config.GetCredentials()
if err != nil {
return err
var key string
if config.GetCredentials != nil {
credentials, err := config.GetCredentials()
if err != nil {
return err
}
key = string(credentials.ApiKey)
zeroByteArray(credentials.ApiKey)
} else {
key = config.ApiKey
}

request.SetHeader("X-Api-Key", string(credentials.ApiKey))
zeroByteArray(credentials.ApiKey)
request.SetHeader("X-Api-Key", key)
return nil
})

Expand All @@ -157,48 +168,6 @@ func New(config Config) (*App, error) {
}

app.ctx, app.ctxCancel = context.WithCancel(context.Background())

app.wg.Add(1)
go func() {
defer app.wg.Done()
backoffFactor := 0
maxbackoffFactor := 3
reconnectBackoff := 30 * time.Second
reconnectDelay := 30 * time.Second

//loop to call app.connect with a reconnect delay with gradual backoff
for {
err := app.connect("")
if err != nil {
log.Logger.Errorf("Failed to connect the app: %v", err)
app.close()
app.reportError(err)
} else {
//obtain the device list
app.loadTenantsDevices()
//reset backoff factor for successful connection
backoffFactor = 0

select {
case err = <-app.conn.Error:
app.close()
app.reportError(err)
case <-app.ctx.Done():
return
}
}

select {
case <-time.After(reconnectDelay + reconnectBackoff*time.Duration(backoffFactor)):
//increment backoff factor by 1 for gradual backoff
if backoffFactor < maxbackoffFactor {
backoffFactor += 1
}
case <-app.ctx.Done():
return
}
}
}()
return app, nil
}

Expand Down Expand Up @@ -279,19 +248,22 @@ func (app *App) close() error {
return nil
}

// connect opens a websocket connection to pxGrid Cloud
// WORKAROUND provide an option to use previous subscription ID
func (app *App) connect(subscriptionID string) error {
// pubsubConnect opens a websocket connection to pxGrid Cloud
func (app *App) pubsubConnect() error {
var err error
app.conn, err = pubsub.NewConnection(pubsub.Config{
GroupID: app.config.GroupID,
Domain: url.PathEscape(app.config.RegionalFQDN),
APIKeyProvider: func() ([]byte, error) {
credentials, e := app.config.GetCredentials()
if e != nil {
return nil, e
if app.config.GetCredentials != nil {
credentials, e := app.config.GetCredentials()
if e != nil {
return nil, e
}
return credentials.ApiKey, e
} else {
return []byte(app.config.ApiKey), nil
}
return credentials.ApiKey, e
},
Transport: app.config.Transport,
})
Expand Down Expand Up @@ -566,6 +538,8 @@ func (app *App) setTenant(tenant *Tenant) error {
}
app.deviceMap.Store(tenant.id, &deviceMapInternal)

// Once a tenant is added, we can start pubsub
app.startPubsubConnect()
return nil
}

Expand All @@ -592,3 +566,49 @@ func (app *App) loadTenantsDevices() {
return true
})
}

func (app *App) startPubsubConnect() {
app.startPubsubConnectOnce.Do(func() {
app.wg.Add(1)
go func() {
defer app.wg.Done()
backoffFactor := 0
maxbackoffFactor := 3
reconnectBackoff := 30 * time.Second
reconnectDelay := 30 * time.Second

//loop to call app.connect with a reconnect delay with gradual backoff
for {
err := app.pubsubConnect()
if err != nil {
log.Logger.Errorf("Failed to connect the app: %v", err)
app.close()
app.reportError(err)
} else {
//obtain the device list
app.loadTenantsDevices()
//reset backoff factor for successful connection
backoffFactor = 0

select {
case err = <-app.conn.Error:
app.close()
app.reportError(err)
case <-app.ctx.Done():
return
}
}

select {
case <-time.After(reconnectDelay + reconnectBackoff*time.Duration(backoffFactor)):
//increment backoff factor by 1 for gradual backoff
if backoffFactor < maxbackoffFactor {
backoffFactor += 1
}
case <-app.ctx.Done():
return
}
}
}()
})
}
10 changes: 8 additions & 2 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
Expand Down Expand Up @@ -57,14 +58,15 @@ var (
type AppTestSuite struct {
suite.Suite
config Config
s *httptest.Server
}

func (suite *AppTestSuite) SetupTest() {
s := test.NewRPCServer(suite.T(), test.Config{
suite.s = test.NewRPCServer(suite.T(), test.Config{
PubSubPath: apiPaths.pubsub,
SubscriptionsPath: apiPaths.subscriptions,
})
u, _ := url.Parse(s.URL)
u, _ := url.Parse(suite.s.URL)

suite.config = Config{
ID: "appId",
Expand All @@ -89,6 +91,10 @@ func (suite *AppTestSuite) SetupTest() {
}
}

func (suite *AppTestSuite) TearDownTest() {
suite.s.Close()
}

func (suite *AppTestSuite) TestNew() {
app, err := New(suite.config)
suite.Nil(err)
Expand Down
92 changes: 92 additions & 0 deletions appinstance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package cloud

import (
"errors"

"github.com/rs/xid"
)

type appInstanceRequest struct {
OTP string `json:"otp"`
ParentAppID string `json:"parent_application_id"`
InstanceID string `json:"instance_identifier"`
InstanceName string `json:"instance_name"`
}

type appInstanceResponse struct {
AppID string `json:"app_id"`
AppApiKey string `json:"app_api_key"`
TenantToken string `json:"api_token"`
TenantID string `json:"tenant_id"`
TenantName string `json:"tenant_name"`
}

// LinkTenantWithNewAppInstance redeems the OTP and links a tenant to a new application instance.
// InstanceName will be shown in UI to signify the new application instance.
// The returned App.ID(), App.ApiKey(), Tenant.ID(), Tenant.Name() and Tenant.ApiToken() must be stored securely.
func (app *App) LinkTenantWithNewAppInstance(otp, instanceName string) (*App, *Tenant, error) {
appReq := appInstanceRequest{
OTP: otp,
ParentAppID: app.config.ID,
InstanceID: xid.New().String(),
InstanceName: instanceName,
}

var appResp appInstanceResponse
var errorResp errorResponse

r, err := app.httpClient.R().
SetBody(appReq).
SetResult(&appResp).
SetError(&errorResp).
Post(newAppInstancePath)
if err != nil {
return nil, nil, err
}
if r.IsError() {
return nil, nil, errors.New(errorResp.GetError())
}

appConfig := app.newAppConfig(appResp.AppID, appResp.AppApiKey)

childApp, err := New(appConfig)
if err != nil {
return nil, nil, err
}
tenant, err := childApp.SetTenant(appResp.TenantID, appResp.TenantName, appResp.TenantToken)
if err != nil {
return nil, nil, err
}
return childApp, tenant, nil
}

// SetAppInstance adds an application instance.
// The appID and appApiKey are obtained from calling LinkTenantWithNewAppInstance.
// This should be used by application after restart to reload application instances
func (app *App) SetAppInstance(appID, appApiKey string) (*App, error) {
config := app.newAppConfig(appID, appApiKey)
return New(config)
}

func (app *App) newAppConfig(appID, appApiKey string) Config {
return Config{
ID: appID,
GlobalFQDN: app.config.GlobalFQDN,
RegionalFQDN: app.config.RegionalFQDN,
ReadStreamID: "app--" + appID + "-R",
WriteStreamID: "app--" + appID + "-W",
Transport: app.config.Transport,
ApiKey: appApiKey,
DeviceActivationHandler: app.config.DeviceActivationHandler,
DeviceDeactivationHandler: app.config.DeviceDeactivationHandler,
DeviceMessageHandler: app.config.DeviceMessageHandler,
}
}

func (app *App) ID() string {
return app.config.ID
}

func (app *App) ApiKey() string {
return app.config.ApiKey
}
18 changes: 18 additions & 0 deletions examples/multi-instance/config_sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
app:
id: replace_with_app_id
apiKey: replace_with_api_key
globalFQDN: dnaservices.cisco.com
regionalFQDN: neoffers.cisco.com
readStream: replace_with_read_stream
writeStream: replace_with_write_stream

appInstance:
otp: replace_with_otp_from_dna_cloud
name: Instance 001
# config file will be rewritten with id, key after using OTP
id:
apiKey:
tenant:
id:
name:
token:
Loading

0 comments on commit 92a3602

Please sign in to comment.