diff --git a/tests/e2e/cluster_downgrade_test.go b/tests/e2e/cluster_downgrade_test.go index fb6f7d954f2..64d6b685064 100644 --- a/tests/e2e/cluster_downgrade_test.go +++ b/tests/e2e/cluster_downgrade_test.go @@ -23,25 +23,37 @@ import ( "time" "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/fileutil" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/storage/datadir" + "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/testutils" ) func TestDowngradeUpgradeClusterOf1(t *testing.T) { - testDowngradeUpgrade(t, 1) + testDowngradeUpgrade(t, 1, false) } func TestDowngradeUpgradeClusterOf3(t *testing.T) { - testDowngradeUpgrade(t, 3) + testDowngradeUpgrade(t, 3, false) } -func testDowngradeUpgrade(t *testing.T, clusterSize int) { +func TestDowngradeUpgradeClusterOf1WithSnapshot(t *testing.T) { + testDowngradeUpgrade(t, 1, true) +} + +func TestDowngradeUpgradeClusterOf3WithSnapshot(t *testing.T) { + testDowngradeUpgrade(t, 3, true) +} + +func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) { currentEtcdBinary := e2e.BinPath.Etcd lastReleaseBinary := e2e.BinPath.EtcdLastRelease if !fileutil.Exist(lastReleaseBinary) { @@ -67,7 +79,8 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) { e2e.BeforeTest(t) t.Logf("Create cluster with version %s", currentVersionStr) - epc := newCluster(t, clusterSize) + var snapshotCount uint64 = 10 + epc := newCluster(t, clusterSize, snapshotCount) for i := 0; i < len(epc.Procs); i++ { validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{ Cluster: currentVersionStr, @@ -76,6 +89,11 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) { }) } t.Logf("Cluster created") + if triggerSnapshot { + t.Logf("Generating snapshot") + generateSnapshot(t, snapshotCount, epc) + } + bm, bkv := getMembersAndKeys(t, epc) t.Logf("etcdctl downgrade enable %s", lastVersionStr) downgradeEnable(t, epc, lastVersion) @@ -108,6 +126,10 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) { } t.Log("Downgrade complete") + am, akv := getMembersAndKeys(t, epc) + assert.Equal(t, bkv.Kvs, akv.Kvs) + assert.Equal(t, bm.Members, am.Members) + t.Logf("Starting upgrade process to %q", currentVersionStr) for i := 0; i < len(epc.Procs); i++ { t.Logf("Upgrading member %d", i) @@ -129,9 +151,10 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int) { t.Log("Upgrade complete") } -func newCluster(t *testing.T, clusterSize int) *e2e.EtcdProcessCluster { +func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdProcessCluster { epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithClusterSize(clusterSize), + e2e.WithSnapshotCount(snapshotCount), e2e.WithKeepDataDir(true), ) if err != nil { @@ -243,3 +266,45 @@ func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdPr } return result, nil } + +func generateSnapshot(t *testing.T, snapshotCount uint64, epc *e2e.EtcdProcessCluster) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC()) + assert.NoError(t, err) + + var i uint64 + t.Logf("Adding keys") + for i = 0; i < snapshotCount*3; i++ { + err := cc.Put(ctx, fmt.Sprintf("%d", i), "1", config.PutOptions{}) + assert.NoError(t, err) + } + verifySnapshot(t, epc) +} + +func verifySnapshot(t *testing.T, epc *e2e.EtcdProcessCluster) { + for i := range epc.Procs { + t.Logf("Verifying snapshot for member %d", i) + ss := snap.New(epc.Cfg.Logger, datadir.ToSnapDir(epc.Procs[i].Config().DataDirPath)) + _, err := ss.Load() + assert.NoError(t, err) + } + t.Logf("All members have a valid snapshot") +} + +func getMembersAndKeys(t *testing.T, epc *e2e.EtcdProcessCluster) (*clientv3.MemberListResponse, *clientv3.GetResponse) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC()) + assert.NoError(t, err) + + kvs, err := cc.Get(ctx, "", config.GetOptions{Prefix: true}) + assert.NoError(t, err) + + members, err := cc.MemberList(ctx, false) + assert.NoError(t, err) + + return members, kvs +}