Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support querying k8s version with retrying #1130

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/ingress/kube/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ var (
"Total invalid ingresses known to pilot.",
monitoring.WithLabels(clusterTag, invalidType),
)

queryK8sVersionFail = monitoring.NewSum(
"pilot_query_k8s_version_fail",
"query k8s version of remote cluster fail number")
)

func init() {
monitoring.MustRegister(totalIngresses)
monitoring.MustRegister(totalInvalidIngress)
monitoring.MustRegister(queryK8sVersionFail)
}

func RecordIngressNumber(cluster string, number int) {
Expand Down
118 changes: 100 additions & 18 deletions pkg/ingress/kube/common/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,44 @@ import (
"net"
"sort"
"strings"
"time"

networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/kube"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"

netv1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
. "github.com/alibaba/higress/pkg/ingress/log"
)

const (
defaultInterval = 3 * time.Second
defaultTimeout = 1 * time.Minute
)

type retry struct {
interval time.Duration
timeout time.Duration
}

type RetryOption func(o *retry)

func WithInterval(interval time.Duration) RetryOption {
return func(r *retry) {
r.interval = interval
}
}

func WithTimeout(timeout time.Duration) RetryOption {
return func(r *retry) {
r.timeout = timeout
}
}

func ValidateBackendResource(resource *v1.TypedLocalObjectReference) bool {
if resource == nil || resource.APIGroup == nil ||
*resource.APIGroup != netv1.SchemeGroupVersion.Group ||
Expand All @@ -42,43 +68,99 @@ func ValidateBackendResource(resource *v1.TypedLocalObjectReference) bool {
}

// V1Available check if the "networking/v1" Ingress is available.
func V1Available(client kube.Client) bool {
func V1Available(client kube.Client, retryOptions ...RetryOption) bool {
retry := &retry{
interval: defaultInterval,
timeout: defaultTimeout,
}

for _, option := range retryOptions {
option(retry)
}

// most case is greater than 1.18
supportV1 := true
err := wait.PollImmediate(retry.interval, retry.timeout, func() (done bool, err error) {
available, err := v1Available(client)
if err != nil {
IngressLog.Errorf("check v1 available error: %v", err)
// retry
return false, nil
}
supportV1 = available
// we have done.
return true, nil
})

if err != nil {
IngressLog.Errorf("check v1 available finally error: %v", err)
}

return supportV1
}

// v1Available check if the "networking/v1" Ingress is available.
func v1Available(client kube.Client) (bool, error) {
// check kubernetes version to use new ingress package or not
version119, _ := version.ParseGeneric("v1.19.0")
return IsRunningVersionAtLeast("v1.19.0", client)
}

// IsRunningVersionAtLeast check if the running version is greater than or equal to the atLeastVersion.
func IsRunningVersionAtLeast(atLeastVersionStr string, client kube.Client) (bool, error) {
atLeastVersion, _ := version.ParseGeneric(atLeastVersionStr)

serverVersion, err := client.GetKubernetesVersion()
if err != nil {
// Consider the new ingress package is available as default
return true
queryK8sVersionFail.Increment()
return false, err
}

runningVersion, err := version.ParseGeneric(serverVersion.String())
if err != nil {
// Consider the new ingress package is available as default
IngressLog.Errorf("unexpected error parsing running Kubernetes version: %v", err)
return true
queryK8sVersionFail.Increment()
return false, err
}

return runningVersion.AtLeast(version119)
return runningVersion.AtLeast(atLeastVersion), nil
}

// NetworkingIngressAvailable check if the "networking" group Ingress is available.
func NetworkingIngressAvailable(client kube.Client) bool {
// check kubernetes version to use new ingress package or not
version118, _ := version.ParseGeneric("v1.18.0")
func NetworkingIngressAvailable(client kube.Client, retryOptions ...RetryOption) bool {
retry := &retry{
interval: defaultInterval,
timeout: defaultTimeout,
}

serverVersion, err := client.GetKubernetesVersion()
if err != nil {
return false
for _, option := range retryOptions {
option(retry)
}

runningVersion, err := version.ParseGeneric(serverVersion.String())
// most case is greater than or equal 1.18.
supportNetworking := true

err := wait.PollImmediate(retry.interval, retry.timeout, func() (done bool, err error) {
available, err := networkingIngressAvailable(client)
if err != nil {
IngressLog.Errorf("check networking available error: %v", err)
// retry
return false, nil
}
supportNetworking = available
// we have done.
return true, nil
})

if err != nil {
IngressLog.Errorf("unexpected error parsing running Kubernetes version: %v", err)
return false
IngressLog.Errorf("check networking available finally error: %v", err)
}

return runningVersion.AtLeast(version118)
return supportNetworking
}

// networkingIngressAvailable check if the "networking" group Ingress is available.
func networkingIngressAvailable(client kube.Client) (bool, error) {
// check kubernetes version to use new ingress package or not
return IsRunningVersionAtLeast("v1.18.0", client)
}

// SortIngressByCreationTime sorts the list of config objects in ascending order by their creation time (if available).
Expand Down
107 changes: 107 additions & 0 deletions pkg/ingress/kube/common/tool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
package common

import (
"istio.io/istio/pkg/kube"
kubeVersion "k8s.io/apimachinery/pkg/version"
"testing"
"time"

networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/config"
Expand Down Expand Up @@ -556,3 +559,107 @@ func TestSortHTTPRoutesWithMoreRules(t *testing.T) {
}
}
}

type supportV1Client struct {
kube.Client
}

func (s *supportV1Client) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "v1.28.3-aliyun.1",
}, nil
}

type unSupportV1Client struct {
kube.Client
}

func (u *unSupportV1Client) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "v1.18.0",
}, nil
}

type supportNetworkingClient struct {
kube.Client
}

func (s *supportNetworkingClient) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "v1.18.0-aliyun.1",
}, nil
}

type unSupportNetworkingClient struct {
kube.Client
}

func (u *unSupportNetworkingClient) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "v1.17.0-aliyun.1",
}, nil
}

type errorClient struct {
kube.Client
}

func (e *errorClient) GetKubernetesVersion() (*kubeVersion.Info, error) {
return &kubeVersion.Info{
GitVersion: "error",
}, nil
}

func TestV1Available(t *testing.T) {
fakeClient := kube.NewFakeClient()

v1Client := &supportV1Client{
fakeClient,
}

if !V1Available(v1Client) {
t.Fatal("should support v1")
}

v1Beta1Client := &unSupportV1Client{
fakeClient,
}
if V1Available(v1Beta1Client) {
t.Fatal("should not support v1")
}

errorClient := &errorClient{
fakeClient,
}
// will fallback to v1
if !V1Available(errorClient, WithInterval(1*time.Second), WithTimeout(3*time.Second)) {
t.Fatal("should fallback to v1")
}
}

func TestNetworkingIngressAvailable(t *testing.T) {
fakeClient := kube.NewFakeClient()

networkingClient := &supportNetworkingClient{
fakeClient,
}

if !NetworkingIngressAvailable(networkingClient) {
t.Fatal("should support networking")
}

notNetworkingClient := &unSupportNetworkingClient{
fakeClient,
}
if NetworkingIngressAvailable(notNetworkingClient) {
t.Fatal("should not support networking")
}

errorClient := &errorClient{
fakeClient,
}
// will fallback to networking
if !NetworkingIngressAvailable(errorClient, WithInterval(1*time.Second), WithTimeout(3*time.Second)) {
t.Fatal("should fallback to networking")
}
}
Loading