Skip to content

Commit

Permalink
Metrics Server: use gRPC connection to get metrics from Operator (ked…
Browse files Browse the repository at this point in the history
  • Loading branch information
zroubalik authored and josephangbc committed Dec 6, 2022
1 parent 3a570ec commit 1819e83
Show file tree
Hide file tree
Showing 100 changed files with 1,914 additions and 470 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: Consolidate all exposed Prometheus Metrics in KEDA Operator ([#3919](https://github.com/kedacore/keda/issues/3919))
- **General**: Disable response compression for k8s restAPI in client-go ([#3863](https://github.com/kedacore/keda/issues/3863)). Kubernetes issue for reference (https://github.com/kubernetes/kubernetes/issues/112296)
- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588))
- **General:** Introduce new Loki Scaler ([#3699](https://github.com/kedacore/keda/issues/3699))
Expand Down Expand Up @@ -93,6 +94,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Bump Golang to 1.18.6 ([#3205](https://github.com/kedacore/keda/issues/3205))
- **General**: Bump `github.com/Azure/azure-event-hubs-go/v3` ([#2986](https://github.com/kedacore/keda/issues/2986))
- **General**: Migrate from `azure-service-bus-go` to `azservicebus` ([#3394](https://github.com/kedacore/keda/issues/3394))
- **General**: Metrics Server: use gRPC connection to get metrics from Operator ([#3920](https://github.com/kedacore/keda/issues/3920))
- **General**: Metrics Server: use OpenAPI definitions served by custom-metrics-apiserver ([#3929](https://github.com/kedacore/keda/issues/3929))
- **Azure EventHub**: Add e2e tests ([#2792](https://github.com/kedacore/keda/issues/2792))

Expand Down
1 change: 0 additions & 1 deletion CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ KEDA works in conjunction with Kubernetes Horizontal Pod Autoscaler (HPA). When

The return type of this function is `MetricSpec`, but in KEDA's case we will mostly write External metrics. So the property that should be filled is `ExternalMetricSource`, where the:
- `MetricName`: the name of our metric we are returning in this scaler. The name should be unique, to allow setting multiple (even the same type) Triggers in one ScaledObject, but each function call should return the same name.
- `MetricSelector`: //TODO
- `TargetValue`: is the value of the metric we want to reach at all times at all costs. As long as the current metric doesn't match TargetValue, HPA will increase the number of the pods until it reaches the maximum number of pods allowed to scale to.
- `TargetAverageValue`: the value of the metric for which we require one pod to handle. e.g. if we are have a scaler based on the length of a message queue, and we specificy 10 for `TargetAverageValue`, we are saying that each pod will handle 10 messages. So if the length of the queue becomes 30, we expect that we have 3 pods in our cluster. (`TargetAverage` and `TargetValue` are mutually exclusive)

Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ clientset-generate: ## Generate client-go clientset, listers and informers.
proto-gen: protoc-gen ## Generate Liiklus, ExternalScaler and MetricsService proto
PATH="$(LOCALBIN):$(PATH)" protoc -I vendor --proto_path=hack LiiklusService.proto --go_out=pkg/scalers/liiklus --go-grpc_out=pkg/scalers/liiklus
PATH="$(LOCALBIN):$(PATH)" protoc -I vendor --proto_path=pkg/scalers/externalscaler externalscaler.proto --go_out=pkg/scalers/externalscaler --go-grpc_out=pkg/scalers/externalscaler
PATH="$(LOCALBIN):$(PATH)" protoc -I vendor --proto_path=pkg/metricsservice/api metrics.proto --go_out=pkg/metricsservice/api --go-grpc_out=pkg/metricsservice/api

.PHONY: mockgen-gen
mockgen-gen: mockgen pkg/mock/mock_scaling/mock_interface.go pkg/mock/mock_scaler/mock_scaler.go pkg/mock/mock_scale/mock_interfaces.go pkg/mock/mock_client/mock_interfaces.go pkg/scalers/liiklus/mocks/mock_liiklus.go
Expand Down
36 changes: 32 additions & 4 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/metricsservice"
prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/adapter"
kedaprovider "github.com/kedacore/keda/v2/pkg/provider"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
Expand All @@ -65,6 +66,7 @@ var (
adapterClientRequestBurst int
metricsAPIServerPort int
disableCompression bool
metricsServiceAddr string
)

func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration, maxConcurrentReconciles int) (provider.MetricsProvider, <-chan struct{}, error) {
Expand Down Expand Up @@ -115,6 +117,12 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD (%s)", err)
}

useMetricsServiceGrpc, err := kedautil.ResolveOsEnvBool("KEDA_USE_METRICS_SERVICE_GRPC", true)
if err != nil {
logger.Error(err, "Invalid KEDA_USE_METRICS_SERVICE_GRPC")
return nil, nil, fmt.Errorf("invalid KEDA_USE_METRICS_SERVICE_GRPC (%s)", err)
}

metricsBindAddress := fmt.Sprintf(":%v", metricsAPIServerPort)
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MetricsBindAddress: metricsBindAddress,
Expand All @@ -137,13 +145,19 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat

prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()
stopCh := make(chan struct{})

if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil {
logger.Info("Connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
grpcClient, err := metricsservice.NewGrpcClient(metricsServiceAddr)
if err != nil {
logger.Error(err, "error connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
return nil, nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
stopCh := make(chan struct{})
if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil {
return nil, nil, err
}
return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}) error {
Expand All @@ -167,6 +181,19 @@ func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHa
return nil
}

// generateDefaultMetricsServiceAddr generates default Metrics Service gRPC Server address based on the current Namespace.
// By default the Metrics Service gRPC Server runs in the same namespace on the keda-operator pod.
func generateDefaultMetricsServiceAddr() string {
const defaultNamespace = "keda"
podNamespace := os.Getenv("POD_NAMESPACE")

if podNamespace == "" {
podNamespace = defaultNamespace
}

return fmt.Sprintf("keda-operator.%s.svc.cluster.local:9666", podNamespace)
}

func printVersion() {
logger.Info(fmt.Sprintf("KEDA Version: %s", version.Version))
logger.Info(fmt.Sprintf("KEDA Commit: %s", version.GitCommit))
Expand Down Expand Up @@ -204,6 +231,7 @@ func main() {
cmd.Flags().IntVar(&metricsAPIServerPort, "port", 8080, "Set the port for the metrics API server")
cmd.Flags().IntVar(&prometheusMetricsPort, "metrics-port", 9022, "Set the port to expose prometheus metrics")
cmd.Flags().StringVar(&prometheusMetricsPath, "metrics-path", "/metrics", "Set the path for the prometheus metrics endpoint")
cmd.Flags().StringVar(&metricsServiceAddr, "metrics-service-address", generateDefaultMetricsServiceAddr(), "The address of the gRPRC Metrics Service Server.")
cmd.Flags().Float32Var(&adapterClientRequestQPS, "kube-api-qps", 20.0, "Set the QPS rate for throttling requests sent to the apiserver")
cmd.Flags().IntVar(&adapterClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver")
cmd.Flags().BoolVar(&disableCompression, "disable-compression", true, "Disable response compression for k8s restAPI in client-go. ")
Expand Down
27 changes: 27 additions & 0 deletions apis/keda/v1alpha1/indentifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2022 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"fmt"
"strings"
)

// GenerateIdentifier returns identifier for the object in form "kind.namespace.name" (lowercase)
func GenerateIdentifier(kind, namespace, name string) string {
return strings.ToLower(fmt.Sprintf("%s.%s.%s", kind, namespace, name))
}
8 changes: 3 additions & 5 deletions apis/keda/v1alpha1/withtriggers_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package v1alpha1

import (
"fmt"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -89,7 +87,7 @@ func (t *WithTriggers) GetPollingInterval() time.Duration {
return time.Second * time.Duration(defaultPollingInterval)
}

// GenerateIdenitifier returns identifier for the object in for "kind.namespace.name"
func (t *WithTriggers) GenerateIdenitifier() string {
return strings.ToLower(fmt.Sprintf("%s.%s.%s", t.Kind, t.Namespace, t.Name))
// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (t *WithTriggers) GenerateIdentifier() string {
return GenerateIdentifier(t.Kind, t.Namespace, t.Name)
}
Loading

0 comments on commit 1819e83

Please sign in to comment.