From 80efcb952e88ccb0631453a6341d9fab2c01b407 Mon Sep 17 00:00:00 2001 From: Muhammad Fadhlika Date: Fri, 13 Jan 2023 14:21:26 +0700 Subject: [PATCH] Add support when JetStream cluster not on kubernetes Signed-off-by: Muhammad Fadhlika --- CHANGELOG.md | 3 +- pkg/scalers/nats_jetstream_scaler.go | 99 +++++++++++++++-------- pkg/scalers/nats_jetstream_scaler_test.go | 2 +- 3 files changed, 70 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 75d736dc449..f99bd736636 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,7 +59,8 @@ Here is an overview of all new **experimental** features: ### Improvements - **General**: Use (self-signed) certificates for all the communications (internals and externals) ([#3931](https://github.com/kedacore/keda/issues/3931)) -- **Redis Scalers**: Add support to Redis 7 ([#4052](https://github.com/kedacore/keda/issues/4052)) +- **Redis**: Add support to Redis 7 ([#4052](https://github.com/kedacore/keda/issues/4052)) +- **NATS Jetstream Scaler:** Add compatibility when cluster not on kubernetes ([#4101](https://github.com/kedacore/keda/issues/4101)) ### Fixes diff --git a/pkg/scalers/nats_jetstream_scaler.go b/pkg/scalers/nats_jetstream_scaler.go index 06e4dca95ca..df266432b48 100644 --- a/pkg/scalers/nats_jetstream_scaler.go +++ b/pkg/scalers/nats_jetstream_scaler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "net" "net/http" "net/url" "strconv" @@ -52,12 +53,8 @@ type jetStreamEndpointResponse struct { } type jetStreamServerEndpointResponse struct { - Cluster jetStreamCluster `json:"cluster"` - ServerName string `json:"server_name"` -} - -type jetStreamCluster struct { - HostUrls []string `json:"urls"` + HostUrls []string `json:"connect_urls"` + ServerName string `json:"server_name"` } type accountDetail struct { @@ -214,12 +211,9 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context } if s.metadata.clusterSize > 1 { - // we know who the consumer leader is, query it directly - if s.metadata.consumerLeader != "" { - natsJetStreamMonitoringLeaderURL, err := s.getNATSJetStreamMonitoringNodeURL(s.metadata.consumerLeader) - if err != nil { - return err - } + // we know who the consumer leader url is, query it directly + if s.metadata.monitoringLeaderURL != "" { + natsJetStreamMonitoringLeaderURL := s.metadata.monitoringLeaderURL jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringLeaderURL) if err != nil { @@ -231,32 +225,36 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context } // we haven't found the consumer yet, grab the list of hosts and try each one - natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL() + natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL("") if err != nil { return err } - req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringServerURL, nil) + jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL) if err != nil { return err } - resp, err := s.httpClient.Do(req) - if err != nil { - s.logger.Error(err, "unable to access NATS JetStream monitoring server endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringServerURL) - return err - } + for _, clusterURL := range jetStreamServerResp.HostUrls { + // get hostname from the url + // nats-1.nats.svc.cluster.local:4221 -> nats-1.nats.svc.cluster.local + // or + // 172.0.1.3:4221 -> 172.0.1.3 + nodeHostname := strings.Split(clusterURL, ":")[0] + natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL(nodeHostname) + if err != nil { + return err + } - defer resp.Body.Close() - var jetStreamServerResp *jetStreamServerEndpointResponse - if err = json.NewDecoder(resp.Body).Decode(&jetStreamServerResp); err != nil { - s.logger.Error(err, "unable to decode NATS JetStream server details") - return err - } + // Query server info to get its name + jetStreamServerResp, err := s.getNATSJetstreamServerInfo(ctx, natsJetStreamMonitoringServerURL) + if err != nil { + return err + } + + node := jetStreamServerResp.ServerName - for _, clusterURL := range jetStreamServerResp.Cluster.HostUrls { - node := strings.Split(clusterURL, ".")[0] - natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(node) + natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(nodeHostname) if err != nil { return err } @@ -320,6 +318,28 @@ func (s *natsJetStreamScaler) invalidateNATSJetStreamCachedMonitoringData() { s.stream = nil } +func (s *natsJetStreamScaler) getNATSJetstreamServerInfo(ctx context.Context, natsJetStreamMonitoringServerURL string) (*jetStreamServerEndpointResponse, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringServerURL, nil) + if err != nil { + return nil, err + } + + resp, err := s.httpClient.Do(req) + if err != nil { + s.logger.Error(err, "unable to access NATS JetStream monitoring server endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringServerURL) + return nil, err + } + + defer resp.Body.Close() + var jetStreamServerResp *jetStreamServerEndpointResponse + if err = json.NewDecoder(resp.Body).Decode(&jetStreamServerResp); err != nil { + s.logger.Error(err, "unable to decode NATS JetStream server details") + return nil, err + } + + return jetStreamServerResp, nil +} + func (s *natsJetStreamScaler) getNATSJetstreamMonitoringRequest(ctx context.Context, natsJetStreamMonitoringURL string) (*jetStreamEndpointResponse, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringURL, nil) if err != nil { @@ -349,22 +369,37 @@ func getNATSJetStreamMonitoringURL(useHTTPS bool, natsServerEndpoint string, acc return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account) } -func (s *natsJetStreamScaler) getNATSJetStreamMonitoringServerURL() (string, error) { +func (s *natsJetStreamScaler) getNATSJetStreamMonitoringServerURL(nodeHostname string) (string, error) { jsURL, err := url.Parse(s.metadata.monitoringURL) if err != nil { s.logger.Error(err, "unable to parse monitoring URL to create server URL", "natsServerMonitoringURL", s.metadata.monitoringURL) return "", err } - return fmt.Sprintf("%s://%s/varz", jsURL.Scheme, jsURL.Host), nil + + host := jsURL.Host + if nodeHostname != "" { + host = nodeHostname + + if jsURL.Port() != "" { + host = net.JoinHostPort(host, jsURL.Port()) + } + } + + return fmt.Sprintf("%s://%s/varz", jsURL.Scheme, host), nil } -func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(node string) (string, error) { +func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(nodeHostname string) (string, error) { jsURL, err := url.Parse(s.metadata.monitoringURL) if err != nil { s.logger.Error(err, "unable to parse monitoring URL to create node URL", "natsServerMonitoringURL", s.metadata.monitoringURL) return "", err } - return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, node, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil + + if jsURL.Port() != "" { + nodeHostname = net.JoinHostPort(nodeHostname, jsURL.Port()) + } + + return fmt.Sprintf("%s://%s%s?%s", jsURL.Scheme, nodeHostname, jsURL.Path, jsURL.RawQuery), nil } func (s *natsJetStreamScaler) getMaxMsgLag() int64 { diff --git a/pkg/scalers/nats_jetstream_scaler_test.go b/pkg/scalers/nats_jetstream_scaler_test.go index eeafe020d23..549d123e935 100644 --- a/pkg/scalers/nats_jetstream_scaler_test.go +++ b/pkg/scalers/nats_jetstream_scaler_test.go @@ -431,7 +431,7 @@ func TestNATSJetStreamGetNATSJetstreamServerURL(t *testing.T) { mockJetStreamScaler.metadata.monitoringURL = "234234:::::34234234;;;;really_bad_URL;;/" - _, err = mockJetStreamScaler.getNATSJetStreamMonitoringServerURL() + _, err = mockJetStreamScaler.getNATSJetStreamMonitoringServerURL("") if err == nil { t.Error("Expected error for parsing monitoring server URL but got success") }