Skip to content

Commit

Permalink
Multi region support
Browse files Browse the repository at this point in the history
  • Loading branch information
kmadhura committed Jul 23, 2024
1 parent b306dde commit 536236b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 44 deletions.
21 changes: 16 additions & 5 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -233,6 +234,7 @@ func (app *App) Close() error {
for _, connection := range app.conn {
connection.Disconnect()
}
app.conn = nil
}
app.ctxCancel()
app.wg.Wait()
Expand Down Expand Up @@ -265,6 +267,7 @@ func (app *App) close() error {
for _, connection := range app.conn {
connection.Disconnect()
}
app.conn = nil
}

app.tenantMap.Range(func(key interface{}, _ interface{}) bool {
Expand Down Expand Up @@ -649,13 +652,21 @@ func (app *App) startPubsubConnect() {
//reset backoff factor for successful connection
backoffFactor = 0

for _, connection := range app.conn {
select {
case err = <-connection.Error:
cases := make([]reflect.SelectCase, len(app.conn))
for i, connection := range app.conn {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(connection.Error)}
}
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(app.ctx.Done())})
for {
chosen, value, ok := reflect.Select(cases)
if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
cases[chosen].Chan = reflect.ValueOf(nil)
return
} else {
err = fmt.Errorf(value.String())
app.close()
app.reportError(err)
case <-app.ctx.Done():
return
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion examples/echo-query/config_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ app:
id: replace_with_app_id
apiKey: replace_with_api_key
globalFQDN: dnaservices.cisco.com
regionalFQDN: neoffers.cisco.com
regionalFQDNs:
- neoffers.cisco.com
readStream: replace_with_read_stream
writeStream: replace_with_write_stream

Expand Down
68 changes: 32 additions & 36 deletions examples/echo-query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type appConfig struct {
Id string `yaml:"id"`
ApiKey string `yaml:"apiKey"`
GlobalFQDN string `yaml:"globalFQDN"`
RegionalFQDN string `yaml:"regionalFQDN"`
RegionalFQDNs []string `yaml:"regionalFQDNs"`
ReadStream string `yaml:"readStream"`
WriteStream string `yaml:"writeStream"`
Expand Down Expand Up @@ -109,7 +108,6 @@ func main() {
ID: config.App.Id,
GetCredentials: getCredentials,
GlobalFQDN: config.App.GlobalFQDN,
RegionalFQDN: config.App.RegionalFQDN,
RegionalFQDNs: config.App.RegionalFQDNs,
DeviceActivationHandler: activationHandler,
DeviceDeactivationHandler: deactivationHandler,
Expand Down Expand Up @@ -197,46 +195,44 @@ func main() {
}

// Loop through all the devices of the configured regions
for i := 0; i < len(filteredDevices); i++ {
device := filteredDevices[i]
logger.Infof("Selected device name=%s tenant=%s id=%s region=%s", device.Name(), device.Tenant().Name(), device.ID(), device.Region())

// Setup input
var reader io.Reader
if *file != "" {
f, err := os.Open(*file)
if err != nil {
panic(err)
}
defer f.Close()
reader = f
} else {
reader = os.Stdin
}

if *method == "" {
*method = http.MethodPost
}
if *url == "" {
*url = "/pxgrid/echo/query"
}
device := filteredDevices[0]
logger.Infof("Selected device name=%s tenant=%s id=%s region=%s", device.Name(), device.Tenant().Name(), device.ID(), device.Region())

// Perform api request
req, _ := http.NewRequest(*method, *url, reader)
resp, err := device.Query(req)
// Setup input
var reader io.Reader
if *file != "" {
f, err := os.Open(*file)
if err != nil {
panic(err)
}
defer resp.Body.Close()
defer f.Close()
reader = f
} else {
reader = os.Stdin
}

// Write body to output
n, err := io.Copy(writer, resp.Body)
if err != nil {
panic(err)
}
fmt.Println()
logger.Infof("Query completed. status=%s bodyLen=%d\n", resp.Status, n)
if *method == "" {
*method = http.MethodPost
}
if *url == "" {
*url = "/pxgrid/echo/query"
}

// Perform api request
req, _ := http.NewRequest(*method, *url, reader)
resp, err := device.Query(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()

// Write body to output
n, err := io.Copy(writer, resp.Body)
if err != nil {
panic(err)
}
fmt.Println()
logger.Infof("Query completed. status=%s bodyLen=%d\n", resp.Status, n)

if err = app.Close(); err != nil {
panic(err)
Expand Down
4 changes: 2 additions & 2 deletions tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func (t *Tenant) setHttpClient(httpClient *resty.Client) {
func (t *Tenant) setRegionalHttpClients(regionalHttpClients map[string]*resty.Client) {

t.regionalHttpClients = regionalHttpClients
for regionalFQDN := range t.regionalHttpClients {
t.regionalHttpClients[regionalFQDN].OnBeforeRequest(func(_ *resty.Client, request *resty.Request) error {
for _, regionalHttpClient := range t.regionalHttpClients {
regionalHttpClient.OnBeforeRequest(func(_ *resty.Client, request *resty.Request) error {
request.SetHeader("X-API-KEY", t.ApiToken())
return nil
})
Expand Down

0 comments on commit 536236b

Please sign in to comment.