From 93eb21eab9d20ec0abc1ebda1eba4f2f3dcd09a6 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 4 Nov 2022 15:44:01 +0100 Subject: [PATCH] tests: Add support for lazyfs Signed-off-by: Marek Siarkowicz --- .github/workflows/robustness-template.yaml | 4 + Makefile | 14 +++ tests/framework/e2e/cluster.go | 12 ++- tests/framework/e2e/cluster_direct.go | 6 +- tests/framework/e2e/etcd_process.go | 33 +++++- tests/framework/e2e/flags.go | 13 +++ tests/framework/e2e/lazyfs.go | 111 +++++++++++++++++++++ tests/robustness/failpoints.go | 16 ++- tests/robustness/linearizability_test.go | 34 ++++--- 9 files changed, 220 insertions(+), 23 deletions(-) create mode 100644 tests/framework/e2e/lazyfs.go diff --git a/.github/workflows/robustness-template.yaml b/.github/workflows/robustness-template.yaml index 78de5b78bbd4..aaf895de83aa 100644 --- a/.github/workflows/robustness-template.yaml +++ b/.github/workflows/robustness-template.yaml @@ -38,6 +38,10 @@ jobs: run: | set -euo pipefail go clean -testcache + + # Build LazyFS + sudo apt-get -y install cmake libfuse3-dev libfuse3-3 fuse3 + make install-lazyfs # Use --failfast to avoid overriding report generated by failed test GO_TEST_FLAGS="-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestRobustness" diff --git a/Makefile b/Makefile index 200a1fbd1486..38a71cad037a 100644 --- a/Makefile +++ b/Makefile @@ -149,6 +149,20 @@ ifeq (, $(shell which yamlfmt)) endif yamlfmt -conf tools/.yamlfmt . +# Tools + +.PHONY: install-lazyfs +install-lazyfs: bin/lazyfs + +bin/lazyfs: + rm /tmp/lazyfs -rf + git clone https://github.com/dsrhaslab/lazyfs /tmp/lazyfs + cd /tmp/lazyfs/; git checkout 94ef5e60117f2a6c6d12b29e09e287c3893150ca + cd /tmp/lazyfs/libs/libpcache; ./build.sh + cd /tmp/lazyfs/lazyfs; ./build.sh + mkdir -p ./bin + cp /tmp/lazyfs/lazyfs/build/lazyfs ./bin/lazyfs + # Cleanup clean: diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index ac8243d9d7e8..17c3c37a0d3e 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -185,6 +185,7 @@ type EtcdProcessClusterConfig struct { CompactHashCheckEnabled bool CompactHashCheckTime time.Duration GoFailEnabled bool + LazyFSEnabled bool CompactionBatchLimit int CompactionSleepInterval time.Duration @@ -344,6 +345,10 @@ func WithGoFailEnabled(enabled bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.GoFailEnabled = enabled } } +func WithLazyFSEnabled(enabled bool) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.LazyFSEnabled = enabled } +} + func WithWarningUnaryRequestDuration(time time.Duration) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.WarningUnaryRequestDuration = time } } @@ -407,7 +412,7 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP // launch etcd processes for i := range etcdCfgs { - proc, err := NewEtcdProcess(etcdCfgs[i]) + proc, err := NewEtcdProcess(t, etcdCfgs[i]) if err != nil { epc.Close() return nil, fmt.Errorf("cannot configure: %v", err) @@ -659,6 +664,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in InitialToken: cfg.InitialToken, GoFailPort: gofailPort, Proxy: proxyCfg, + LazyFSEnabled: cfg.LazyFSEnabled, } } @@ -826,7 +832,7 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces // Then start process tb.Log("start new member") - proc, err := NewEtcdProcess(serverCfg) + proc, err := NewEtcdProcess(tb, serverCfg) if err != nil { epc.Close() return 0, fmt.Errorf("cannot configure: %v", err) @@ -855,7 +861,7 @@ func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...E } epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new") - proc, err := NewEtcdProcess(serverCfg) + proc, err := NewEtcdProcess(tb, serverCfg) if err != nil { return err } diff --git a/tests/framework/e2e/cluster_direct.go b/tests/framework/e2e/cluster_direct.go index ac659bd6bcc4..70c60dbf4c0a 100644 --- a/tests/framework/e2e/cluster_direct.go +++ b/tests/framework/e2e/cluster_direct.go @@ -16,6 +16,8 @@ package e2e -func NewEtcdProcess(cfg *EtcdServerProcessConfig) (EtcdProcess, error) { - return NewEtcdServerProcess(cfg) +import "testing" + +func NewEtcdProcess(t testing.TB, cfg *EtcdServerProcessConfig) (EtcdProcess, error) { + return NewEtcdServerProcess(t, cfg) } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 1d8f941bf84a..19323df1f938 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -56,6 +56,7 @@ type EtcdProcess interface { Config() *EtcdServerProcessConfig PeerProxy() proxy.Server Failpoints() *BinaryFailpoints + LazyFS() *LazyFS Logs() LogsExpect Kill() error } @@ -70,6 +71,7 @@ type EtcdServerProcess struct { cfg *EtcdServerProcessConfig proc *expect.ExpectProcess proxy proxy.Server + lazyfs *LazyFS failpoints *BinaryFailpoints donec chan struct{} // closed when Interact() terminates } @@ -96,10 +98,11 @@ type EtcdServerProcessConfig struct { InitialCluster string GoFailPort int - Proxy *proxy.ServerConfig + LazyFSEnabled bool + Proxy *proxy.ServerConfig } -func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { +func NewEtcdServerProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { if !fileutil.Exist(cfg.ExecPath) { return nil, fmt.Errorf("could not find etcd binary: %s", cfg.ExecPath) } @@ -107,11 +110,17 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err if err := os.RemoveAll(cfg.DataDirPath); err != nil { return nil, err } + if err := os.Mkdir(cfg.DataDirPath, 0777); err != nil { + return nil, err + } } ep := &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})} if cfg.GoFailPort != 0 { ep.failpoints = &BinaryFailpoints{member: ep} } + if cfg.LazyFSEnabled { + ep.lazyfs = newLazyFS(cfg.lg, cfg.DataDirPath, t) + } return ep, nil } @@ -146,6 +155,14 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { return err } } + if ep.lazyfs != nil { + ep.cfg.lg.Info("starting lazyfs...", zap.String("name", ep.cfg.Name)) + err := ep.lazyfs.Start(ctx) + if err != nil { + return err + } + } + ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name)) proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name) if err != nil { @@ -205,6 +222,14 @@ func (ep *EtcdServerProcess) Stop() (err error) { return err } } + if ep.lazyfs != nil { + ep.cfg.lg.Info("stopping lazyfs...", zap.String("name", ep.cfg.Name)) + err = ep.lazyfs.Stop() + ep.lazyfs = nil + if err != nil { + return err + } + } return nil } @@ -298,6 +323,10 @@ func (ep *EtcdServerProcess) PeerProxy() proxy.Server { return ep.proxy } +func (ep *EtcdServerProcess) LazyFS() *LazyFS { + return ep.lazyfs +} + func (ep *EtcdServerProcess) Failpoints() *BinaryFailpoints { return ep.failpoints } diff --git a/tests/framework/e2e/flags.go b/tests/framework/e2e/flags.go index ef072f8ad78c..23bdde6590ce 100644 --- a/tests/framework/e2e/flags.go +++ b/tests/framework/e2e/flags.go @@ -48,6 +48,18 @@ type binPath struct { EtcdLastRelease string Etcdctl string Etcdutl string + LazyFS string +} + +func (bp *binPath) LazyFSAvailable() bool { + _, err := os.Stat(bp.LazyFS) + if err != nil { + if !os.IsNotExist(err) { + panic(err) + } + return false + } + return true } func InitFlags() { @@ -65,6 +77,7 @@ func InitFlags() { EtcdLastRelease: *binDir + "/etcd-last-release", Etcdctl: *binDir + "/etcdctl", Etcdutl: *binDir + "/etcdutl", + LazyFS: *binDir + "/lazyfs", } CertPath = CertDir + "/server.crt" PrivateKeyPath = CertDir + "/server.key.insecure" diff --git a/tests/framework/e2e/lazyfs.go b/tests/framework/e2e/lazyfs.go new file mode 100644 index 000000000000..ccd0b30f4903 --- /dev/null +++ b/tests/framework/e2e/lazyfs.go @@ -0,0 +1,111 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "go.etcd.io/etcd/pkg/v3/expect" + "go.uber.org/zap" +) + +func newLazyFS(lg *zap.Logger, dataDir string, tmp TempDirProvider) *LazyFS { + return &LazyFS{ + lg: lg, + DataDir: dataDir, + LazyFSDir: tmp.TempDir(), + } +} + +type TempDirProvider interface { + TempDir() string +} + +type LazyFS struct { + lg *zap.Logger + + DataDir string + LazyFSDir string + + ep *expect.ExpectProcess +} + +func (fs *LazyFS) Start(ctx context.Context) (err error) { + if fs.ep != nil { + return nil + } + err = os.WriteFile(fs.configPath(), fs.config(), 0666) + if err != nil { + return err + } + dataPath := filepath.Join(fs.LazyFSDir, "data") + err = os.Mkdir(dataPath, 0777) + if err != nil { + return err + } + flags := []string{fs.DataDir, "--config-path", fs.configPath(), "-o", "allow_other", "-o", "modules=subdir", "-o", "subdir=" + dataPath, "-f"} + fs.lg.Info("Started lazyfs", zap.Strings("flags", flags)) + fs.ep, err = expect.NewExpect(BinPath.LazyFS, flags...) + if err != nil { + return err + } + _, err = fs.ep.ExpectWithContext(ctx, "waiting for fault commands") + return err +} + +func (fs *LazyFS) configPath() string { + return filepath.Join(fs.LazyFSDir, "config.toml") +} + +func (fs *LazyFS) socketPath() string { + return filepath.Join(fs.LazyFSDir, "sock.fifo") +} + +func (fs *LazyFS) config() []byte { + return []byte(fmt.Sprintf(`[faults] +fifo_path=%q +[cache] +apply_eviction=false +[cache.simple] +custom_size="1gb" +blocks_per_page=1 +[filesystem] +log_all_operations=false +`, fs.socketPath())) +} + +func (fs *LazyFS) Stop() error { + if fs.ep == nil { + return nil + } + defer func() { fs.ep = nil }() + err := fs.ep.Stop() + if err != nil { + return err + } + return fs.ep.Close() +} + +func (fs *LazyFS) ClearCache(ctx context.Context) error { + err := os.WriteFile(fs.socketPath(), []byte("lazyfs::clear-cache\n"), 0666) + if err != nil { + return err + } + _, err = fs.ep.ExpectWithContext(ctx, "cache is cleared") + return err +} diff --git a/tests/robustness/failpoints.go b/tests/robustness/failpoints.go index b954d4288f58..779424f55b21 100644 --- a/tests/robustness/failpoints.go +++ b/tests/robustness/failpoints.go @@ -208,7 +208,13 @@ func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) } } - + if lazyfs := member.LazyFS(); lazyfs != nil { + lg.Info("Removing data that was not fsynced") + err := lazyfs.ClearCache(ctx) + if err != nil { + return err + } + } err := member.Start(ctx) if err != nil { return err @@ -278,6 +284,14 @@ func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logg lg.Info("Member exited as expected", zap.String("member", member.Config().Name)) } + if lazyfs := member.LazyFS(); lazyfs != nil { + lg.Info("Removing data that was not fsynced") + err := lazyfs.ClearCache(ctx) + if err != nil { + return err + } + } + return member.Start(ctx) } diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index fc568c22859b..bf75174c2a42 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -40,27 +40,31 @@ func TestRobustness(t *testing.T) { if err != nil { t.Fatalf("Failed checking etcd version binary, binary: %q, err: %v", e2e.BinPath.Etcd, err) } + enableLazyFS := e2e.BinPath.LazyFSAvailable() + baseOptions := []e2e.EPClusterOption{ + e2e.WithSnapshotCount(100), + e2e.WithGoFailEnabled(true), + e2e.WithCompactionBatchLimit(100), + e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond), + } scenarios := []testScenario{} for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} { + clusterOfSize1Options := baseOptions + clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1)) + if enableLazyFS { + clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true)) + } scenarios = append(scenarios, testScenario{ name: traffic.Name + "/ClusterOfSize1", traffic: traffic, - cluster: *e2e.NewConfig( - e2e.WithClusterSize(1), - e2e.WithSnapshotCount(100), - e2e.WithGoFailEnabled(true), - e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints - e2e.WithWatchProcessNotifyInterval(100*time.Millisecond), - ), + cluster: *e2e.NewConfig(clusterOfSize1Options...), }) - clusterOfSize3Options := []e2e.EPClusterOption{ - e2e.WithIsPeerTLS(true), - e2e.WithSnapshotCount(100), - e2e.WithPeerProxy(true), - e2e.WithGoFailEnabled(true), - e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints - e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond), - } + } + + for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} { + clusterOfSize3Options := baseOptions + clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithIsPeerTLS(true)) + clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithPeerProxy(true)) if !v.LessThan(version.V3_6) { clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100)) }