From 12ab968e63d667703768840810a7b8ec46b38272 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 4 Nov 2022 15:44:01 +0100 Subject: [PATCH] tests: Add support for lazyfs Signed-off-by: Marek Siarkowicz --- .github/workflows/robustness-nightly.yaml | 8 +- .github/workflows/robustness-template.yaml | 4 + .github/workflows/robustness.yaml | 2 +- Makefile | 14 +++ tests/framework/e2e/cluster.go | 12 ++- tests/framework/e2e/cluster_direct.go | 6 +- tests/framework/e2e/etcd_process.go | 33 +++++- tests/framework/e2e/flags.go | 13 +++ tests/framework/e2e/lazyfs.go | 112 +++++++++++++++++++++ tests/robustness/failpoints.go | 16 ++- tests/robustness/linearizability_test.go | 93 ++++++++++++----- tests/robustness/traffic/etcd.go | 74 ++++++-------- tests/robustness/traffic/kubernetes.go | 27 +++-- tests/robustness/traffic/traffic.go | 41 +++++--- 14 files changed, 345 insertions(+), 110 deletions(-) create mode 100644 tests/framework/e2e/lazyfs.go diff --git a/.github/workflows/robustness-nightly.yaml b/.github/workflows/robustness-nightly.yaml index 2b0ab5ce49d0..0d0c706a95e4 100644 --- a/.github/workflows/robustness-nightly.yaml +++ b/.github/workflows/robustness-nightly.yaml @@ -12,14 +12,14 @@ jobs: uses: ./.github/workflows/robustness-template.yaml with: etcdBranch: main - count: 100 + count: 80 testTimeout: 200m artifactName: main main-arm64: uses: ./.github/workflows/robustness-template-arm64.yaml with: etcdBranch: main - count: 100 + count: 80 testTimeout: 200m artifactName: main-arm64 runs-on: "['self-hosted', 'Linux', 'ARM64']" @@ -27,7 +27,7 @@ jobs: uses: ./.github/workflows/robustness-template.yaml with: etcdBranch: release-3.5 - count: 100 + count: 80 testTimeout: 200m artifactName: release-35 release-35-arm64: @@ -41,6 +41,6 @@ jobs: uses: ./.github/workflows/robustness-template.yaml with: etcdBranch: release-3.4 - count: 100 + count: 80 testTimeout: 200m artifactName: release-34 diff --git a/.github/workflows/robustness-template.yaml b/.github/workflows/robustness-template.yaml index 78de5b78bbd4..5b5177f77738 100644 --- a/.github/workflows/robustness-template.yaml +++ b/.github/workflows/robustness-template.yaml @@ -39,6 +39,10 @@ jobs: set -euo pipefail go clean -testcache + # Build LazyFS + sudo apt-get -y install cmake libfuse3-dev libfuse3-3 fuse3 + make install-lazyfs + # Use --failfast to avoid overriding report generated by failed test GO_TEST_FLAGS="-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestRobustness" case "${ETCD_BRANCH}" in diff --git a/.github/workflows/robustness.yaml b/.github/workflows/robustness.yaml index f9ddcba911f4..80e55a9a3c3b 100644 --- a/.github/workflows/robustness.yaml +++ b/.github/workflows/robustness.yaml @@ -7,6 +7,6 @@ jobs: uses: ./.github/workflows/robustness-template.yaml with: etcdBranch: main - count: 15 + count: 12 testTimeout: 30m artifactName: main diff --git a/Makefile b/Makefile index 200a1fbd1486..38a71cad037a 100644 --- a/Makefile +++ b/Makefile @@ -149,6 +149,20 @@ ifeq (, $(shell which yamlfmt)) endif yamlfmt -conf tools/.yamlfmt . +# Tools + +.PHONY: install-lazyfs +install-lazyfs: bin/lazyfs + +bin/lazyfs: + rm /tmp/lazyfs -rf + git clone https://github.com/dsrhaslab/lazyfs /tmp/lazyfs + cd /tmp/lazyfs/; git checkout 94ef5e60117f2a6c6d12b29e09e287c3893150ca + cd /tmp/lazyfs/libs/libpcache; ./build.sh + cd /tmp/lazyfs/lazyfs; ./build.sh + mkdir -p ./bin + cp /tmp/lazyfs/lazyfs/build/lazyfs ./bin/lazyfs + # Cleanup clean: diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index ac8243d9d7e8..17c3c37a0d3e 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -185,6 +185,7 @@ type EtcdProcessClusterConfig struct { CompactHashCheckEnabled bool CompactHashCheckTime time.Duration GoFailEnabled bool + LazyFSEnabled bool CompactionBatchLimit int CompactionSleepInterval time.Duration @@ -344,6 +345,10 @@ func WithGoFailEnabled(enabled bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.GoFailEnabled = enabled } } +func WithLazyFSEnabled(enabled bool) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.LazyFSEnabled = enabled } +} + func WithWarningUnaryRequestDuration(time time.Duration) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.WarningUnaryRequestDuration = time } } @@ -407,7 +412,7 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP // launch etcd processes for i := range etcdCfgs { - proc, err := NewEtcdProcess(etcdCfgs[i]) + proc, err := NewEtcdProcess(t, etcdCfgs[i]) if err != nil { epc.Close() return nil, fmt.Errorf("cannot configure: %v", err) @@ -659,6 +664,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in InitialToken: cfg.InitialToken, GoFailPort: gofailPort, Proxy: proxyCfg, + LazyFSEnabled: cfg.LazyFSEnabled, } } @@ -826,7 +832,7 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces // Then start process tb.Log("start new member") - proc, err := NewEtcdProcess(serverCfg) + proc, err := NewEtcdProcess(tb, serverCfg) if err != nil { epc.Close() return 0, fmt.Errorf("cannot configure: %v", err) @@ -855,7 +861,7 @@ func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...E } epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new") - proc, err := NewEtcdProcess(serverCfg) + proc, err := NewEtcdProcess(tb, serverCfg) if err != nil { return err } diff --git a/tests/framework/e2e/cluster_direct.go b/tests/framework/e2e/cluster_direct.go index ac659bd6bcc4..70c60dbf4c0a 100644 --- a/tests/framework/e2e/cluster_direct.go +++ b/tests/framework/e2e/cluster_direct.go @@ -16,6 +16,8 @@ package e2e -func NewEtcdProcess(cfg *EtcdServerProcessConfig) (EtcdProcess, error) { - return NewEtcdServerProcess(cfg) +import "testing" + +func NewEtcdProcess(t testing.TB, cfg *EtcdServerProcessConfig) (EtcdProcess, error) { + return NewEtcdServerProcess(t, cfg) } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 1d8f941bf84a..19323df1f938 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -56,6 +56,7 @@ type EtcdProcess interface { Config() *EtcdServerProcessConfig PeerProxy() proxy.Server Failpoints() *BinaryFailpoints + LazyFS() *LazyFS Logs() LogsExpect Kill() error } @@ -70,6 +71,7 @@ type EtcdServerProcess struct { cfg *EtcdServerProcessConfig proc *expect.ExpectProcess proxy proxy.Server + lazyfs *LazyFS failpoints *BinaryFailpoints donec chan struct{} // closed when Interact() terminates } @@ -96,10 +98,11 @@ type EtcdServerProcessConfig struct { InitialCluster string GoFailPort int - Proxy *proxy.ServerConfig + LazyFSEnabled bool + Proxy *proxy.ServerConfig } -func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { +func NewEtcdServerProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { if !fileutil.Exist(cfg.ExecPath) { return nil, fmt.Errorf("could not find etcd binary: %s", cfg.ExecPath) } @@ -107,11 +110,17 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err if err := os.RemoveAll(cfg.DataDirPath); err != nil { return nil, err } + if err := os.Mkdir(cfg.DataDirPath, 0777); err != nil { + return nil, err + } } ep := &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})} if cfg.GoFailPort != 0 { ep.failpoints = &BinaryFailpoints{member: ep} } + if cfg.LazyFSEnabled { + ep.lazyfs = newLazyFS(cfg.lg, cfg.DataDirPath, t) + } return ep, nil } @@ -146,6 +155,14 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { return err } } + if ep.lazyfs != nil { + ep.cfg.lg.Info("starting lazyfs...", zap.String("name", ep.cfg.Name)) + err := ep.lazyfs.Start(ctx) + if err != nil { + return err + } + } + ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name)) proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name) if err != nil { @@ -205,6 +222,14 @@ func (ep *EtcdServerProcess) Stop() (err error) { return err } } + if ep.lazyfs != nil { + ep.cfg.lg.Info("stopping lazyfs...", zap.String("name", ep.cfg.Name)) + err = ep.lazyfs.Stop() + ep.lazyfs = nil + if err != nil { + return err + } + } return nil } @@ -298,6 +323,10 @@ func (ep *EtcdServerProcess) PeerProxy() proxy.Server { return ep.proxy } +func (ep *EtcdServerProcess) LazyFS() *LazyFS { + return ep.lazyfs +} + func (ep *EtcdServerProcess) Failpoints() *BinaryFailpoints { return ep.failpoints } diff --git a/tests/framework/e2e/flags.go b/tests/framework/e2e/flags.go index ef072f8ad78c..23bdde6590ce 100644 --- a/tests/framework/e2e/flags.go +++ b/tests/framework/e2e/flags.go @@ -48,6 +48,18 @@ type binPath struct { EtcdLastRelease string Etcdctl string Etcdutl string + LazyFS string +} + +func (bp *binPath) LazyFSAvailable() bool { + _, err := os.Stat(bp.LazyFS) + if err != nil { + if !os.IsNotExist(err) { + panic(err) + } + return false + } + return true } func InitFlags() { @@ -65,6 +77,7 @@ func InitFlags() { EtcdLastRelease: *binDir + "/etcd-last-release", Etcdctl: *binDir + "/etcdctl", Etcdutl: *binDir + "/etcdutl", + LazyFS: *binDir + "/lazyfs", } CertPath = CertDir + "/server.crt" PrivateKeyPath = CertDir + "/server.key.insecure" diff --git a/tests/framework/e2e/lazyfs.go b/tests/framework/e2e/lazyfs.go new file mode 100644 index 000000000000..a561bf8284f5 --- /dev/null +++ b/tests/framework/e2e/lazyfs.go @@ -0,0 +1,112 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "go.uber.org/zap" + + "go.etcd.io/etcd/pkg/v3/expect" +) + +func newLazyFS(lg *zap.Logger, dataDir string, tmp TempDirProvider) *LazyFS { + return &LazyFS{ + lg: lg, + DataDir: dataDir, + LazyFSDir: tmp.TempDir(), + } +} + +type TempDirProvider interface { + TempDir() string +} + +type LazyFS struct { + lg *zap.Logger + + DataDir string + LazyFSDir string + + ep *expect.ExpectProcess +} + +func (fs *LazyFS) Start(ctx context.Context) (err error) { + if fs.ep != nil { + return nil + } + err = os.WriteFile(fs.configPath(), fs.config(), 0666) + if err != nil { + return err + } + dataPath := filepath.Join(fs.LazyFSDir, "data") + err = os.Mkdir(dataPath, 0777) + if err != nil { + return err + } + flags := []string{fs.DataDir, "--config-path", fs.configPath(), "-o", "modules=subdir", "-o", "subdir=" + dataPath, "-f"} + fs.lg.Info("Started lazyfs", zap.Strings("flags", flags)) + fs.ep, err = expect.NewExpect(BinPath.LazyFS, flags...) + if err != nil { + return err + } + _, err = fs.ep.ExpectWithContext(ctx, "waiting for fault commands") + return err +} + +func (fs *LazyFS) configPath() string { + return filepath.Join(fs.LazyFSDir, "config.toml") +} + +func (fs *LazyFS) socketPath() string { + return filepath.Join(fs.LazyFSDir, "sock.fifo") +} + +func (fs *LazyFS) config() []byte { + return []byte(fmt.Sprintf(`[faults] +fifo_path=%q +[cache] +apply_eviction=false +[cache.simple] +custom_size="1gb" +blocks_per_page=1 +[filesystem] +log_all_operations=false +`, fs.socketPath())) +} + +func (fs *LazyFS) Stop() error { + if fs.ep == nil { + return nil + } + defer func() { fs.ep = nil }() + err := fs.ep.Stop() + if err != nil { + return err + } + return fs.ep.Close() +} + +func (fs *LazyFS) ClearCache(ctx context.Context) error { + err := os.WriteFile(fs.socketPath(), []byte("lazyfs::clear-cache\n"), 0666) + if err != nil { + return err + } + _, err = fs.ep.ExpectWithContext(ctx, "cache is cleared") + return err +} diff --git a/tests/robustness/failpoints.go b/tests/robustness/failpoints.go index 1a47ccd5d458..1e1123593835 100644 --- a/tests/robustness/failpoints.go +++ b/tests/robustness/failpoints.go @@ -208,7 +208,13 @@ func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) } } - + if lazyfs := member.LazyFS(); lazyfs != nil { + lg.Info("Removing data that was not fsynced") + err := lazyfs.ClearCache(ctx) + if err != nil { + return err + } + } err := member.Start(ctx) if err != nil { return err @@ -278,6 +284,14 @@ func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logg lg.Info("Member exited as expected", zap.String("member", member.Config().Name)) } + if lazyfs := member.LazyFS(); lazyfs != nil { + lg.Info("Removing data that was not fsynced") + err := lazyfs.ClearCache(ctx) + if err != nil { + return err + } + } + return member.Start(ctx) } diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index fc568c22859b..30bf8d8abb78 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -16,6 +16,7 @@ package robustness import ( "context" + "path/filepath" "testing" "time" @@ -34,39 +35,71 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/validate" ) +type TrafficProfile struct { + Traffic traffic.Traffic + Profile traffic.Profile +} + +var trafficProfiles = []TrafficProfile{ + { + Traffic: traffic.EtcdPut, + Profile: traffic.HighTrafficProfile, + }, + { + Traffic: traffic.EtcdPutDeleteLease, + Profile: traffic.LowTraffic, + }, + { + Traffic: traffic.Kubernetes, + Profile: traffic.HighTrafficProfile, + }, + { + Traffic: traffic.Kubernetes, + Profile: traffic.LowTraffic, + }, +} + func TestRobustness(t *testing.T) { testRunner.BeforeTest(t) v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) if err != nil { t.Fatalf("Failed checking etcd version binary, binary: %q, err: %v", e2e.BinPath.Etcd, err) } + enableLazyFS := e2e.BinPath.LazyFSAvailable() + baseOptions := []e2e.EPClusterOption{ + e2e.WithSnapshotCount(100), + e2e.WithGoFailEnabled(true), + e2e.WithCompactionBatchLimit(100), + e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond), + } scenarios := []testScenario{} - for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} { + for _, tp := range trafficProfiles { + name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize1") + clusterOfSize1Options := baseOptions + clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1)) + // Add LazyFS only for traffic with lower QPS as it uses a lot of CPU lowering minimal QPS. + if enableLazyFS && tp.Profile.MinimalQPS <= 100 { + clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true)) + name = filepath.Join(name, "LazyFS") + } scenarios = append(scenarios, testScenario{ - name: traffic.Name + "/ClusterOfSize1", - traffic: traffic, - cluster: *e2e.NewConfig( - e2e.WithClusterSize(1), - e2e.WithSnapshotCount(100), - e2e.WithGoFailEnabled(true), - e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints - e2e.WithWatchProcessNotifyInterval(100*time.Millisecond), - ), + name: name, + traffic: tp.Traffic, + cluster: *e2e.NewConfig(clusterOfSize1Options...), }) - clusterOfSize3Options := []e2e.EPClusterOption{ - e2e.WithIsPeerTLS(true), - e2e.WithSnapshotCount(100), - e2e.WithPeerProxy(true), - e2e.WithGoFailEnabled(true), - e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints - e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond), - } + } + + for _, tp := range trafficProfiles { + name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize3") + clusterOfSize3Options := baseOptions + clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithIsPeerTLS(true)) + clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithPeerProxy(true)) if !v.LessThan(version.V3_6) { clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100)) } scenarios = append(scenarios, testScenario{ - name: traffic.Name + "/ClusterOfSize3", - traffic: traffic, + name: name, + traffic: tp.Traffic, cluster: *e2e.NewConfig(clusterOfSize3Options...), }) } @@ -89,7 +122,8 @@ func TestRobustness(t *testing.T) { scenarios = append(scenarios, testScenario{ name: "Issue13766", failpoint: KillFailpoint, - traffic: traffic.HighTraffic, + profile: traffic.HighTrafficProfile, + traffic: traffic.EtcdPut, cluster: *e2e.NewConfig( e2e.WithSnapshotCount(100), ), @@ -108,7 +142,8 @@ func TestRobustness(t *testing.T) { scenarios = append(scenarios, testScenario{ name: "Issue15271", failpoint: BlackholeUntilSnapshot, - traffic: traffic.HighTraffic, + profile: traffic.HighTrafficProfile, + traffic: traffic.EtcdPut, cluster: *e2e.NewConfig( e2e.WithSnapshotCatchUpEntries(100), e2e.WithSnapshotCount(100), @@ -118,8 +153,11 @@ func TestRobustness(t *testing.T) { }) } for _, scenario := range scenarios { - if scenario.traffic == (traffic.Config{}) { - scenario.traffic = traffic.LowTraffic + if scenario.traffic == nil { + scenario.traffic = traffic.EtcdPutDeleteLease + } + if scenario.profile == (traffic.Profile{}) { + scenario.profile = traffic.LowTraffic } t.Run(scenario.name, func(t *testing.T) { @@ -135,7 +173,8 @@ type testScenario struct { name string failpoint Failpoint cluster e2e.EtcdProcessClusterConfig - traffic traffic.Config + traffic traffic.Traffic + profile traffic.Profile watch watchConfig } @@ -169,7 +208,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce watchProgressNotifyEnabled := report.Cluster.Cfg.WatchProcessNotifyInterval != 0 validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled) - validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.Traffic.ExpectUniqueRevision()} + validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()} report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client) panicked = false @@ -193,7 +232,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu maxRevisionChan := make(chan int64, 1) g.Go(func() error { defer close(maxRevisionChan) - operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime, ids) + operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids) maxRevisionChan <- operationsMaxRevision(operationReport) return nil }) diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 714ddac3ce41..3f0e36cc5075 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -28,50 +28,36 @@ import ( ) var ( - LowTraffic = Config{ - Name: "LowTraffic", - minimalQPS: 100, - maximalQPS: 200, - clientCount: 8, - maxNonUniqueRequestConcurrency: 3, - Traffic: etcdTraffic{ - keyCount: 10, - leaseTTL: DefaultLeaseTTL, - largePutSize: 32769, - requests: []choiceWeight[etcdRequestType]{ - {choice: Get, weight: 15}, - {choice: List, weight: 15}, - {choice: StaleGet, weight: 10}, - {choice: StaleList, weight: 10}, - {choice: Put, weight: 23}, - {choice: LargePut, weight: 2}, - {choice: Delete, weight: 5}, - {choice: MultiOpTxn, weight: 5}, - {choice: PutWithLease, weight: 5}, - {choice: LeaseRevoke, weight: 5}, - {choice: CompareAndSet, weight: 5}, - }, + EtcdPutDeleteLease = etcdTraffic{ + keyCount: 10, + leaseTTL: DefaultLeaseTTL, + largePutSize: 32769, + requests: []choiceWeight[etcdRequestType]{ + {choice: Get, weight: 15}, + {choice: List, weight: 15}, + {choice: StaleGet, weight: 10}, + {choice: StaleList, weight: 10}, + {choice: Put, weight: 23}, + {choice: LargePut, weight: 2}, + {choice: Delete, weight: 5}, + {choice: MultiOpTxn, weight: 5}, + {choice: PutWithLease, weight: 5}, + {choice: LeaseRevoke, weight: 5}, + {choice: CompareAndSet, weight: 5}, }, } - HighTraffic = Config{ - Name: "HighTraffic", - minimalQPS: 200, - maximalQPS: 1000, - clientCount: 12, - maxNonUniqueRequestConcurrency: 3, - Traffic: etcdTraffic{ - keyCount: 10, - largePutSize: 32769, - leaseTTL: DefaultLeaseTTL, - requests: []choiceWeight[etcdRequestType]{ - {choice: Get, weight: 15}, - {choice: List, weight: 15}, - {choice: StaleGet, weight: 10}, - {choice: StaleList, weight: 10}, - {choice: Put, weight: 40}, - {choice: MultiOpTxn, weight: 5}, - {choice: LargePut, weight: 5}, - }, + EtcdPut = etcdTraffic{ + keyCount: 10, + largePutSize: 32769, + leaseTTL: DefaultLeaseTTL, + requests: []choiceWeight[etcdRequestType]{ + {choice: Get, weight: 15}, + {choice: List, weight: 15}, + {choice: StaleGet, weight: 10}, + {choice: StaleList, weight: 10}, + {choice: Put, weight: 40}, + {choice: MultiOpTxn, weight: 5}, + {choice: LargePut, weight: 5}, }, } ) @@ -104,6 +90,10 @@ const ( Defragment etcdRequestType = "defragment" ) +func (t etcdTraffic) Name() string { + return "Etcd" +} + func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { lastOperationSucceeded := true var lastRev int64 diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 3d0251d8a818..86fdd0fe9295 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -31,21 +31,14 @@ import ( ) var ( - KubernetesTraffic = Config{ - Name: "Kubernetes", - minimalQPS: 200, - maximalQPS: 1000, - clientCount: 12, - maxNonUniqueRequestConcurrency: 3, - Traffic: kubernetesTraffic{ - averageKeyCount: 10, - resource: "pods", - namespace: "default", - writeChoices: []choiceWeight[KubernetesRequestType]{ - {choice: KubernetesUpdate, weight: 90}, - {choice: KubernetesDelete, weight: 5}, - {choice: KubernetesCreate, weight: 5}, - }, + Kubernetes = kubernetesTraffic{ + averageKeyCount: 10, + resource: "pods", + namespace: "default", + writeChoices: []choiceWeight[KubernetesRequestType]{ + {choice: KubernetesUpdate, weight: 90}, + {choice: KubernetesDelete, weight: 5}, + {choice: KubernetesCreate, weight: 5}, }, } ) @@ -61,6 +54,10 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool { return true } +func (t kubernetesTraffic) Name() string { + return "Kubernetes" +} + func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { kc := &kubernetesClient{client: c} s := newStorage() diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 54c3b02a9fd5..5e4a4b89d1a0 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -34,15 +34,30 @@ var ( RequestTimeout = 40 * time.Millisecond WatchTimeout = 400 * time.Millisecond MultiOpTxnOpCount = 4 + + LowTraffic = Profile{ + Name: "LowTraffic", + MinimalQPS: 100, + MaximalQPS: 200, + ClientCount: 8, + MaxNonUniqueRequestConcurrency: 3, + } + HighTrafficProfile = Profile{ + Name: "HighTraffic", + MinimalQPS: 200, + MaximalQPS: 1000, + ClientCount: 12, + MaxNonUniqueRequestConcurrency: 3, + } ) -func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}, baseTime time.Time, ids identity.Provider) []report.ClientReport { +func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, finish <-chan struct{}, baseTime time.Time, ids identity.Provider) []report.ClientReport { mux := sync.Mutex{} endpoints := clus.EndpointsGRPC() lm := identity.NewLeaseIdStorage() reports := []report.ClientReport{} - limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) + limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200) startTime := time.Now() cc, err := NewClient(endpoints, ids, baseTime) @@ -51,8 +66,8 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 } defer cc.Close() wg := sync.WaitGroup{} - nonUniqueWriteLimiter := NewConcurrencyLimiter(config.maxNonUniqueRequestConcurrency) - for i := 0; i < config.clientCount; i++ { + nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency) + for i := 0; i < profile.ClientCount; i++ { wg.Add(1) c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) if err != nil { @@ -62,7 +77,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 defer wg.Done() defer c.Close() - config.Traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish) + traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish) mux.Lock() reports = append(reports, c.Report()) mux.Unlock() @@ -87,22 +102,22 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 qps := float64(operationCount) / float64(endTime.Sub(startTime)) * float64(time.Second) lg.Info("Average traffic", zap.Float64("qps", qps)) - if qps < config.minimalQPS { - t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps) + if qps < profile.MinimalQPS { + t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", profile.MinimalQPS, qps) } return reports } -type Config struct { +type Profile struct { Name string - minimalQPS float64 - maximalQPS float64 - maxNonUniqueRequestConcurrency int - clientCount int - Traffic Traffic + MinimalQPS float64 + MaximalQPS float64 + MaxNonUniqueRequestConcurrency int + ClientCount int } type Traffic interface { Run(ctx context.Context, c *RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) ExpectUniqueRevision() bool + Name() string }