Skip to content

Commit

Permalink
Add support when JetStream cluster not on kubernetes
Browse files Browse the repository at this point in the history
Signed-off-by: Muhammad Fadhlika <git@fadhlika.com>
  • Loading branch information
mfadhlika committed Jan 13, 2023
1 parent be56ebe commit 80efcb9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 34 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ Here is an overview of all new **experimental** features:
### Improvements

- **General**: Use (self-signed) certificates for all the communications (internals and externals) ([#3931](https://github.com/kedacore/keda/issues/3931))
- **Redis Scalers**: Add support to Redis 7 ([#4052](https://github.com/kedacore/keda/issues/4052))
- **Redis**: Add support to Redis 7 ([#4052](https://github.com/kedacore/keda/issues/4052))
- **NATS Jetstream Scaler:** Add compatibility when cluster not on kubernetes ([#4101](https://github.com/kedacore/keda/issues/4101))

### Fixes

Expand Down
99 changes: 67 additions & 32 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -52,12 +53,8 @@ type jetStreamEndpointResponse struct {
}

type jetStreamServerEndpointResponse struct {
Cluster jetStreamCluster `json:"cluster"`
ServerName string `json:"server_name"`
}

type jetStreamCluster struct {
HostUrls []string `json:"urls"`
HostUrls []string `json:"connect_urls"`
ServerName string `json:"server_name"`
}

type accountDetail struct {
Expand Down Expand Up @@ -214,12 +211,9 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context
}

if s.metadata.clusterSize > 1 {
// we know who the consumer leader is, query it directly
if s.metadata.consumerLeader != "" {
natsJetStreamMonitoringLeaderURL, err := s.getNATSJetStreamMonitoringNodeURL(s.metadata.consumerLeader)
if err != nil {
return err
}
// we know who the consumer leader url is, query it directly
if s.metadata.monitoringLeaderURL != "" {
natsJetStreamMonitoringLeaderURL := s.metadata.monitoringLeaderURL

jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringLeaderURL)
if err != nil {
Expand All @@ -231,32 +225,36 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context
}

// we haven't found the consumer yet, grab the list of hosts and try each one
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL()
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL("")
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringServerURL, nil)
jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL)
if err != nil {
return err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring server endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringServerURL)
return err
}
for _, clusterURL := range jetStreamServerResp.HostUrls {
// get hostname from the url
// nats-1.nats.svc.cluster.local:4221 -> nats-1.nats.svc.cluster.local
// or
// 172.0.1.3:4221 -> 172.0.1.3
nodeHostname := strings.Split(clusterURL, ":")[0]
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL(nodeHostname)
if err != nil {
return err
}

defer resp.Body.Close()
var jetStreamServerResp *jetStreamServerEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jetStreamServerResp); err != nil {
s.logger.Error(err, "unable to decode NATS JetStream server details")
return err
}
// Query server info to get its name
jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL)
if err != nil {
return err
}

node := jetStreamServerResp.ServerName

for _, clusterURL := range jetStreamServerResp.Cluster.HostUrls {
node := strings.Split(clusterURL, ".")[0]
natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(node)
natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(nodeHostname)
if err != nil {
return err
}
Expand Down Expand Up @@ -320,6 +318,28 @@ func (s *natsJetStreamScaler) invalidateNATSJetStreamCachedMonitoringData() {
s.stream = nil
}

func (s *natsJetStreamScaler) getNATSJetstreamServerInfo(ctx context.Context, natsJetStreamMonitoringServerURL string) (*jetStreamServerEndpointResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringServerURL, nil)
if err != nil {
return nil, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring server endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringServerURL)
return nil, err
}

defer resp.Body.Close()
var jetStreamServerResp *jetStreamServerEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jetStreamServerResp); err != nil {
s.logger.Error(err, "unable to decode NATS JetStream server details")
return nil, err
}

return jetStreamServerResp, nil
}

func (s *natsJetStreamScaler) getNATSJetstreamMonitoringRequest(ctx context.Context, natsJetStreamMonitoringURL string) (*jetStreamEndpointResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringURL, nil)
if err != nil {
Expand Down Expand Up @@ -349,22 +369,37 @@ func getNATSJetStreamMonitoringURL(useHTTPS bool, natsServerEndpoint string, acc
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account)
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringServerURL() (string, error) {
func (s *natsJetStreamScaler) getNATSJetStreamMonitoringServerURL(nodeHostname string) (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create server URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s/varz", jsURL.Scheme, jsURL.Host), nil

host := jsURL.Host
if nodeHostname != "" {
host = nodeHostname

if jsURL.Port() != "" {
host = net.JoinHostPort(host, jsURL.Port())
}
}

return fmt.Sprintf("%s://%s/varz", jsURL.Scheme, host), nil
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(node string) (string, error) {
func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(nodeHostname string) (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create node URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, node, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil

if jsURL.Port() != "" {
nodeHostname = net.JoinHostPort(nodeHostname, jsURL.Port())
}

return fmt.Sprintf("%s://%s%s?%s", jsURL.Scheme, nodeHostname, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) getMaxMsgLag() int64 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/nats_jetstream_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func TestNATSJetStreamGetNATSJetstreamServerURL(t *testing.T) {

mockJetStreamScaler.metadata.monitoringURL = "234234:::::34234234;;;;really_bad_URL;;/"

_, err = mockJetStreamScaler.getNATSJetStreamMonitoringServerURL()
_, err = mockJetStreamScaler.getNATSJetStreamMonitoringServerURL("")
if err == nil {
t.Error("Expected error for parsing monitoring server URL but got success")
}
Expand Down

0 comments on commit 80efcb9

Please sign in to comment.