Skip to content

Commit

Permalink
Upgrade tests with lots of resources (#2239)
Browse files Browse the repository at this point in the history
* Upgrade Stress tests for control plane

Deploying different Knative Serving and Eventing resources multiple
times, performing an upgrade and checking that the resources are still
ready.

* TMP: kitchensink upgrades actually run stress tests

* Also check Serving, Eventing, Kafka version during upgrade

* Verify post-install jobs

* Fix make target

* NumDeployments == 3

Works with cluster scaled to 5

With NumDeployments == 4 the Nodes get unready

* Verify Pod restarts in system namespaces

* Record memory usage

* Upgrade stress tests complete

* Fix adding Serverless upgrade operation

* Wait 2 minutes before checking

* Move namespace consts to top-level test package

* Do not call dev.sh

* Cleanup

* Ignore version-migrator when counting pod restarts

* Add KafkaSink to kitchensink tests

* PingSource ContainerSource ApiServerSource

ApiServerSource unfinished

* Vendor ApiServerSource from eventing

* ApiServerSource finished

* Define components for Sourcse

* SourceFeatureSet

* Pass images.producer.file to all kitchensink tests

* Use apiserversource.WithSink

* Properly install KafkaTopic

* Remove test change

* Fix lint

* Reduce number of triggers in short mode

* Deploy sink only with one source

* Use inMemoryChannel as sink for sources

* Ignore storage version migration pods

They have different names in Eventing/Serving and EKB:
storage-version-migration-eventing-
storage-version-migration-serving-
knative-kafka-storage-version-migrator

* Prevent re-initializing flags from knative.dev/pkg/test

* Track namespace together with Pod for memory consumption

* Force garbage collection

* TMP: Use GODEBUG=gctrace=1 for serving controller

* Induce failure to get GC logs before upgrade

* Set GOGC to 20 to trigger GC more often

* Run complete test with GOGC=20 for controller

* Enable profiling temporarily

* Do not compare memory consumption

It's not reliable through this metric

* Remove unused const

* Disable profiling for Serving

* Remove temporary alias in Makefile
  • Loading branch information
mgencur authored Sep 18, 2023
1 parent 01076b8 commit 60de240
Show file tree
Hide file tree
Showing 30 changed files with 907 additions and 162 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ test-kitchensink-upgrade:
test-kitchensink-upgrade-testonly:
./test/kitchensink-upgrade-tests.sh

test-kitchensink-upgrade-stress:
UNINSTALL_STRIMZI=false ./hack/strimzi.sh
INSTALL_PREVIOUS_VERSION=true INSTALL_KAFKA=true SCALE_UP=5 ./hack/install.sh
./test/kitchensink-upgrade-stress-tests.sh

test-kitchensink-upgrade-stress-testonly:
./test/kitchensink-upgrade-stress-tests.sh

# Run Console UI e2e tests.
test-ui-e2e-testonly:
./test/ui-e2e-tests.sh
Expand Down
16 changes: 10 additions & 6 deletions test/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ import (
kafkaversioned "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned"
)

const operatorNamesapce = "openshift-serverless"
const (
ServingNamespace = "knative-serving"
EventingNamespace = "knative-eventing"
IngressNamespace = "knative-serving-ingress"
)

// Context holds objects related to test execution
type Context struct {
Expand Down Expand Up @@ -194,14 +198,14 @@ func (c *Context) AddToCleanup(f CleanupFunc) {
func (c *Context) DeleteOperatorPods(ctx context.Context) error {
pods, err := c.Clients.Kube.
CoreV1().
Pods(operatorNamesapce).
Pods(OperatorsNamespace).
List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list pods in %s", operatorNamesapce)
return fmt.Errorf("failed to list pods in %s", OperatorsNamespace)
}

for _, p := range pods.Items {
if err := c.Clients.Kube.CoreV1().Pods(operatorNamesapce).Delete(ctx, p.GetName(), metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
if err := c.Clients.Kube.CoreV1().Pods(OperatorsNamespace).Delete(ctx, p.GetName(), metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
Expand All @@ -213,10 +217,10 @@ func (c *Context) WaitForOperatorPodsReady(ctx context.Context) error {
return wait.PollImmediate(Interval, Timeout, func() (done bool, err error) {
pods, err := c.Clients.Kube.
CoreV1().
Pods(operatorNamesapce).
Pods(OperatorsNamespace).
List(ctx, metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("failed to list pods in %s", operatorNamesapce)
return false, fmt.Errorf("failed to list pods in %s", OperatorsNamespace)
}

// Verify pod readiness
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/knative_eventing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

const (
eventingNamespace = "knative-eventing"
eventingNamespace = test.EventingNamespace
eventingHaReplicas = 2
)

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/knative_serving_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

const (
servingNamespace = "knative-serving"
servingNamespace = test.ServingNamespace
haReplicas = 2
)

Expand Down
4 changes: 2 additions & 2 deletions test/e2ekafka/knative_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

const (
eventingName = "knative-eventing"
eventingNamespace = "knative-eventing"
knativeKafkaNamespace = "knative-eventing"
eventingNamespace = test.EventingNamespace
knativeKafkaNamespace = test.EventingNamespace
defaultNamespace = "default"
)

Expand Down
3 changes: 2 additions & 1 deletion test/eventinge2erekt/features/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/openshift-knative/serverless-operator/test"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -53,7 +54,7 @@ func VerifyEncryptedTrafficToActivator(refs []corev1.ObjectReference, since time
// When running within Mesh a mesh-specific VirtualService is used which
// gets istio-ingressgateway out of the path.
logFilter := LogFilter{
PodNamespace: "knative-serving",
PodNamespace: test.ServingNamespace,
PodSelector: metav1.ListOptions{LabelSelector: "app=activator"},
PodLogOptions: &corev1.PodLogOptions{Container: "istio-proxy", SinceTime: &metav1.Time{Time: since}},
JSONLogFilter: func(m map[string]interface{}) bool {
Expand Down
14 changes: 13 additions & 1 deletion test/eventinge2erekt/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package eventinge2erekt

import (
"context"
"log"
"os"
"testing"
"time"

"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/pkg/system"
pkgTest "knative.dev/pkg/test"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/k8s"
Expand All @@ -20,7 +22,17 @@ var global environment.GlobalEnvironment
func TestMain(m *testing.M) {
broker.EnvCfg.BrokerClass = "MTChannelBasedBroker"

global = environment.NewStandardGlobalEnvironment()
restConfig, err := pkgTest.Flags.ClientConfig.GetRESTConfig()
if err != nil {
log.Fatal("Error building client config: ", err)
}

// Getting the rest config explicitly and passing it further will prevent re-initializing the flagset
// in NewStandardGlobalEnvironment().
global = environment.NewStandardGlobalEnvironment(func(cfg environment.Configuration) environment.Configuration {
cfg.Config = restConfig
return cfg
})

// Run the tests.
os.Exit(m.Run())
Expand Down
9 changes: 5 additions & 4 deletions test/extensione2erekt/features/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/openshift-knative/serverless-operator/test"
eventingfeatures "github.com/openshift-knative/serverless-operator/test/eventinge2erekt/features"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,7 +31,7 @@ func verifyEncryptedTrafficToKafkaSink(sinkName string, since time.Time) feature
// source -> kafka-sink-receiver
sinkPath := fmt.Sprintf("/%s/%s", environment.FromContext(ctx).Namespace(), sinkName)
logFilter := eventingfeatures.LogFilter{
PodNamespace: "knative-eventing",
PodNamespace: test.EventingNamespace,
PodSelector: metav1.ListOptions{LabelSelector: "app=kafka-sink-receiver"},
PodLogOptions: &corev1.PodLogOptions{Container: "istio-proxy", SinceTime: &metav1.Time{Time: since}},
JSONLogFilter: func(m map[string]interface{}) bool {
Expand Down Expand Up @@ -75,7 +76,7 @@ func verifyEncryptedTrafficToKafkaBroker(refs []corev1.ObjectReference, namespac
}
// source -> kafka-broker-receiver
brokerPath := fmt.Sprintf("/%s/%s", environment.FromContext(ctx).Namespace(), brokerName)
brokerReceiverNamespace := "knative-eventing"
brokerReceiverNamespace := test.EventingNamespace
if namespacedBroker {
brokerReceiverNamespace = environment.FromContext(ctx).Namespace()
}
Expand Down Expand Up @@ -119,7 +120,7 @@ func verifyEncryptedTrafficToChannelBasedKafkaBroker(refs []corev1.ObjectReferen
environment.FromContext(ctx).Namespace())

logFilter := eventingfeatures.LogFilter{
PodNamespace: "knative-eventing",
PodNamespace: test.EventingNamespace,
PodSelector: metav1.ListOptions{LabelSelector: "app=kafka-channel-receiver"},
PodLogOptions: &corev1.PodLogOptions{Container: "istio-proxy", SinceTime: &metav1.Time{Time: since}},
JSONLogFilter: func(m map[string]interface{}) bool {
Expand Down Expand Up @@ -156,7 +157,7 @@ func verifyEncryptedTrafficToKafkaChannel(refs []corev1.ObjectReference, since t
environment.FromContext(ctx).Namespace())

logFilter := eventingfeatures.LogFilter{
PodNamespace: "knative-eventing",
PodNamespace: test.EventingNamespace,
PodSelector: metav1.ListOptions{LabelSelector: "app=kafka-channel-receiver"},
PodLogOptions: &corev1.PodLogOptions{Container: "istio-proxy", SinceTime: &metav1.Time{Time: since}},
JSONLogFilter: func(m map[string]interface{}) bool {
Expand Down
14 changes: 13 additions & 1 deletion test/extensione2erekt/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package extensione2erekt

import (
"context"
"log"
"os"
"testing"
"time"

"knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/pkg/system"
pkgTest "knative.dev/pkg/test"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/k8s"
Expand All @@ -21,7 +23,17 @@ func TestMain(m *testing.M) {
channel_impl.EnvCfg.ChannelGK = "KafkaChannel.messaging.knative.dev"
channel_impl.EnvCfg.ChannelV = "v1beta1"

global = environment.NewStandardGlobalEnvironment()
restConfig, err := pkgTest.Flags.ClientConfig.GetRESTConfig()
if err != nil {
log.Fatal("Error building client config: ", err)
}

// Getting the rest config explicitly and passing it further will prevent re-initializing the flagset
// in NewStandardGlobalEnvironment().
global = environment.NewStandardGlobalEnvironment(func(cfg environment.Configuration) environment.Configuration {
cfg.Config = restConfig
return cfg
})

// Run the tests.
os.Exit(m.Run())
Expand Down
19 changes: 19 additions & 0 deletions test/kitchensink-upgrade-stress-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash

# shellcheck disable=SC1091,SC1090
source "$(dirname "${BASH_SOURCE[0]}")/lib.bash"

set -Eeuo pipefail

# Enable extra verbosity if running in CI.
if [ -n "$OPENSHIFT_CI" ]; then
env
fi
debugging.setup # both install and test
dump_state.setup # test

logger.success '🚀 Cluster prepared for testing.'

kitchensink_upgrade_stress_tests

success
4 changes: 2 additions & 2 deletions test/kitchensinke2e/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
const groupSize = 8

func TestBrokerReadinessBrokerDLS(t *testing.T) {
testFeatureSet(t, features.BrokerFeatureSetWithBrokerDLS(false))
testFeatureSet(t, features.BrokerFeatureSetWithBrokerDLS())
}

func TestBrokerReadinessTriggerDLS(t *testing.T) {
testFeatureSet(t, features.BrokerFeatureSetWithTriggerDLS(false))
testFeatureSet(t, features.BrokerFeatureSetWithTriggerDLS())
}

func split(featureSet feature.FeatureSet, groupSize int) []feature.FeatureSet {
Expand Down
2 changes: 1 addition & 1 deletion test/kitchensinke2e/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import (
)

func TestChannelReadiness(t *testing.T) {
testFeatureSet(t, features.ChannelFeatureSet(false))
testFeatureSet(t, features.ChannelFeatureSet())
}
52 changes: 47 additions & 5 deletions test/kitchensinke2e/features/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@ var sinksAll = []component{
inMemoryChannelParallel,
kafkaChannelParallel,
ksvc,
kafkaSink,
}

var sinksShort = []component{
ksvc,
}

// sinksLight is used when deploying multiple instances of the sink
// to reduce CPU/Mem requirements.
var sinksLight = []component{
inMemoryChannel,
}

var brokers = []component{
inMemoryChannelMtBroker,
kafkaChannelMtBroker,
Expand All @@ -37,9 +44,10 @@ var (
deadLetterSinks = sinksAll
deadLetterSinksShort = sinksShort
triggers = sinksAll
triggersShort = sinksShort
)

func BrokerReadiness(broker component, brokerDls component, triggers []component, triggerDls component) *feature.Feature {
func BrokerReadiness(index int, broker component, brokerDls component, triggers []component, triggerDls component) *feature.Feature {
testLabel := shortLabel(broker)
if brokerDls != nil {
testLabel = testLabel + "b" + shortLabel(brokerDls)
Expand All @@ -48,6 +56,8 @@ func BrokerReadiness(broker component, brokerDls component, triggers []component
testLabel = testLabel + "t" + shortLabel(triggerDls)
}

testLabel = fmt.Sprintf("%s-%d", testLabel, index)

brokerName := testLabel
dlsName := testLabel + "-bdls"
triggerDlsName := testLabel + "-tdls"
Expand Down Expand Up @@ -92,17 +102,33 @@ func BrokerReadiness(broker component, brokerDls component, triggers []component
return f
}

func BrokerFeatureSetWithBrokerDLS() feature.FeatureSet {
return brokerFeatureSetWithBrokerDLS(false, 1)
}

func BrokerFeatureSetWithBrokerDLSShort() feature.FeatureSet {
return brokerFeatureSetWithBrokerDLS(true, 1)
}

func BrokerFeatureSetWithBrokerDLSStress() feature.FeatureSet {
return brokerFeatureSetWithBrokerDLS(true, NumDeployments)
}

// BrokerFeatureSetWithBrokerDLS returns all combinations of Broker X DeadLetterSinks,
// each broker with all possible Triggers with the DeadLetterSink set on the Broker.
func BrokerFeatureSetWithBrokerDLS(short bool) feature.FeatureSet {
func brokerFeatureSetWithBrokerDLS(short bool, times int) feature.FeatureSet {
dls := deadLetterSinks
trgs := triggers
if short {
dls = deadLetterSinksShort
trgs = triggersShort
}
features := make([]*feature.Feature, 0, len(brokers)*len(dls))
for _, broker := range brokers {
for _, deadLetterSink := range dls {
features = append(features, BrokerReadiness(broker, deadLetterSink, triggers, nil))
for i := 0; i < times; i++ {
features = append(features, BrokerReadiness(i, broker, deadLetterSink, trgs, nil))
}
}
}
return feature.FeatureSet{
Expand All @@ -111,17 +137,33 @@ func BrokerFeatureSetWithBrokerDLS(short bool) feature.FeatureSet {
}
}

func BrokerFeatureSetWithTriggerDLS() feature.FeatureSet {
return brokerFeatureSetWithTriggerDLS(false, 1)
}

func BrokerFeatureSetWithTriggerDLSShort() feature.FeatureSet {
return brokerFeatureSetWithTriggerDLS(true, 1)
}

func BrokerFeatureSetWithTriggerDLSStress() feature.FeatureSet {
return brokerFeatureSetWithTriggerDLS(true, NumDeployments)
}

// BrokerFeatureSetWithTriggerDLS returns all combinations of Broker X DeadLetterSinks,
// each broker with all possible Triggers with the DeadLetterSink set on the Trigger.
func BrokerFeatureSetWithTriggerDLS(short bool) feature.FeatureSet {
func brokerFeatureSetWithTriggerDLS(short bool, times int) feature.FeatureSet {
dls := deadLetterSinks
trgs := triggers
if short {
dls = deadLetterSinksShort
trgs = triggersShort
}
features := make([]*feature.Feature, 0, len(brokers)*len(dls))
for _, broker := range brokers {
for _, deadLetterSink := range dls {
features = append(features, BrokerReadiness(broker, nil, triggers, deadLetterSink))
for i := 0; i < times; i++ {
features = append(features, BrokerReadiness(i, broker, nil, trgs, deadLetterSink))
}
}
}
return feature.FeatureSet{
Expand Down
Loading

0 comments on commit 60de240

Please sign in to comment.