From a57e967d84fa80eca8aacd785cf6cd915c75cfdd Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 16 Mar 2021 16:08:18 +0100 Subject: [PATCH 1/4] Integration test flakes fixes. --- .../clientv3/connectivity/network_partition_test.go | 6 +++--- tests/integration/snapshot/member_test.go | 3 +++ tests/integration/snapshot/v3_snapshot_test.go | 4 ++++ 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/integration/clientv3/connectivity/network_partition_test.go b/tests/integration/clientv3/connectivity/network_partition_test.go index 8829a6cce35..69cddd56729 100644 --- a/tests/integration/clientv3/connectivity/network_partition_test.go +++ b/tests/integration/clientv3/connectivity/network_partition_test.go @@ -240,9 +240,9 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) { clientv3test.MustWaitPinReady(t, watchCli) t.Logf("successful connection with server: %v", target) - // add all eps to list, so that when the original pined one fails - // the client can switch to other available eps - watchCli.SetEndpoints(eps...) + // We stick to the original endpoint, so when the one fails we don't switch + // under the cover to other available eps, but expose the failure to the + // caller (test assertion). wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify()) select { diff --git a/tests/integration/snapshot/member_test.go b/tests/integration/snapshot/member_test.go index 22997272de5..632ca3c512a 100644 --- a/tests/integration/snapshot/member_test.go +++ b/tests/integration/snapshot/member_test.go @@ -26,12 +26,15 @@ import ( "go.etcd.io/etcd/pkg/v3/testutil" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/tests/v3/integration" ) // TestSnapshotV3RestoreMultiMemberAdd ensures that multiple members // can boot into the same cluster after being restored from a same // snapshot file, and also be able to add another member to the cluster. func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) { + integration.BeforeTest(t) + kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} dbPath := createSnapshotFile(t, kvs) diff --git a/tests/integration/snapshot/v3_snapshot_test.go b/tests/integration/snapshot/v3_snapshot_test.go index 0b7fcc52c35..f70c6846bf9 100644 --- a/tests/integration/snapshot/v3_snapshot_test.go +++ b/tests/integration/snapshot/v3_snapshot_test.go @@ -29,12 +29,14 @@ import ( "go.etcd.io/etcd/etcdctl/v3/snapshot" "go.etcd.io/etcd/pkg/v3/testutil" "go.etcd.io/etcd/server/v3/embed" + "go.etcd.io/etcd/tests/v3/integration" "go.uber.org/zap/zaptest" ) // TestSnapshotV3RestoreSingle tests single node cluster restoring // from a snapshot file. func TestSnapshotV3RestoreSingle(t *testing.T) { + integration.BeforeTest(t) kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} dbPath := createSnapshotFile(t, kvs) defer os.RemoveAll(dbPath) @@ -106,6 +108,7 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { // can boot into the same cluster after being restored from a same // snapshot file. func TestSnapshotV3RestoreMulti(t *testing.T) { + integration.BeforeTest(t) kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} dbPath := createSnapshotFile(t, kvs) defer os.RemoveAll(dbPath) @@ -143,6 +146,7 @@ func TestSnapshotV3RestoreMulti(t *testing.T) { // TestCorruptedBackupFileCheck tests if we can correctly identify a corrupted backup file. func TestCorruptedBackupFileCheck(t *testing.T) { + integration.BeforeTest(t) dbPath := "testdata/corrupted_backup.db" if _, err := os.Stat(dbPath); err != nil { t.Fatalf("test file [%s] does not exist: %v", dbPath, err) From 18382aa234f2ed2393ada6262915a49f6b89c021 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 16 Mar 2021 22:20:00 +0100 Subject: [PATCH 2/4] Fix 2 sources of leaked memory: embed server HTTP & v3_snapshot.leasser. --- etcdctl/snapshot/v3_snapshot.go | 6 +++--- server/embed/etcd.go | 12 ++++-------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 9594bdcb606..0773ab7bd1b 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -355,14 +355,17 @@ func (s *v3Manager) saveDB() error { // update consistentIndex so applies go through on etcdserver despite // having a new raft instance be := backend.NewDefaultBackend(dbpath) + defer be.Close() ci := cindex.NewConsistentIndex(be.BatchTx()) ci.SetConsistentIndex(uint64(commit)) // a lessor never timeouts leases lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci) + defer lessor.Stop() mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) + defer mvs.Close() txn := mvs.Write(traceutil.TODO()) btx := be.BatchTx() del := func(k, v []byte) error { @@ -380,9 +383,6 @@ func (s *v3Manager) saveDB() error { txn.End() mvs.Commit() - mvs.Close() - be.Close() - return nil } diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 73ef8bfb8b7..01d5f691662 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -386,18 +386,14 @@ func (e *Etcd) Close() { } func stopServers(ctx context.Context, ss *servers) { - shutdownNow := func() { - // first, close the http.Server - ss.http.Shutdown(ctx) - // then close grpc.Server; cancels all active RPCs - ss.grpc.Stop() - } + // first, close the http.Server + ss.http.Shutdown(ctx) // do not grpc.Server.GracefulStop with TLS enabled etcd server // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 // and https://github.com/etcd-io/etcd/issues/8916 if ss.secure { - shutdownNow() + ss.grpc.Stop() return } @@ -415,7 +411,7 @@ func stopServers(ctx context.Context, ss *servers) { case <-ctx.Done(): // took too long, manually close open transports // e.g. watch streams - shutdownNow() + ss.grpc.Stop() // concurrent GracefulStop should be interrupted <-ch From 809e7629edcb09fa0257ca52fec68bc062867a82 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 16 Mar 2021 22:46:35 +0100 Subject: [PATCH 3/4] Add integration.BeforeTest to all missing tests. --- .../recipes/v3_double_barrier_test.go | 4 ++++ .../experimental/recipes/v3_lock_test.go | 13 +++++++++++++ .../experimental/recipes/v3_queue_test.go | 11 +++++++++++ tests/integration/clientv3/mirror_test.go | 2 ++ .../integration/clientv3/watch_fragment_test.go | 2 ++ tests/integration/clientv3/watch_test.go | 16 ++++++++++++++++ tests/integration/snapshot/v3_snapshot_test.go | 2 +- .../v2store/store_tag_not_v2v3_test.go | 2 ++ tests/integration/v2store/store_tag_v2v3_test.go | 1 + tests/integration/v2store/store_v2v3_test.go | 6 +++--- tests/integration/v3_alarm_test.go | 5 +++-- tests/integration/v3_lease_test.go | 12 ++++++++++++ tests/integration/v3_stm_test.go | 12 ++++++++++++ tests/integration/v3_watch_restore_test.go | 2 ++ tests/integration/v3_watch_test.go | 6 ++++++ 15 files changed, 90 insertions(+), 6 deletions(-) diff --git a/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go b/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go index feaf9a6f4db..463bb605194 100644 --- a/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go +++ b/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go @@ -24,6 +24,8 @@ import ( ) func TestDoubleBarrier(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -96,6 +98,8 @@ func TestDoubleBarrier(t *testing.T) { } func TestDoubleBarrierFailover(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) diff --git a/tests/integration/clientv3/experimental/recipes/v3_lock_test.go b/tests/integration/clientv3/experimental/recipes/v3_lock_test.go index 9befd5f1d60..7104c3ce74f 100644 --- a/tests/integration/clientv3/experimental/recipes/v3_lock_test.go +++ b/tests/integration/clientv3/experimental/recipes/v3_lock_test.go @@ -28,6 +28,8 @@ import ( ) func TestMutexLockSingleNode(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -37,6 +39,8 @@ func TestMutexLockSingleNode(t *testing.T) { } func TestMutexLockMultiNode(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -89,6 +93,7 @@ func testMutexLock(t *testing.T, waiters int, chooseClient func() *clientv3.Clie } func TestMutexTryLockSingleNode(t *testing.T) { + integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -98,6 +103,7 @@ func TestMutexTryLockSingleNode(t *testing.T) { } func TestMutexTryLockMultiNode(t *testing.T) { + integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -107,6 +113,8 @@ func TestMutexTryLockMultiNode(t *testing.T) { } func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.Client) { + integration.BeforeTest(t) + lockedC := make(chan *concurrency.Mutex) notlockedC := make(chan *concurrency.Mutex) stopC := make(chan struct{}) @@ -155,6 +163,8 @@ func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.C // TestMutexSessionRelock ensures that acquiring the same lock with the same // session will not result in deadlock. func TestMutexSessionRelock(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) session, err := concurrency.NewSession(clus.RandClient()) @@ -285,6 +295,7 @@ func TestMutexWaitsOnCurrentHolder(t *testing.T) { } func BenchmarkMutex4Waiters(b *testing.B) { + integration.BeforeTest(b) // XXX switch tests to use TB interface clus := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(nil) @@ -294,12 +305,14 @@ func BenchmarkMutex4Waiters(b *testing.B) { } func TestRWMutexSingleNode(t *testing.T) { + integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) testRWMutex(t, 5, func() *clientv3.Client { return clus.Client(0) }) } func TestRWMutexMultiNode(t *testing.T) { + integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() }) diff --git a/tests/integration/clientv3/experimental/recipes/v3_queue_test.go b/tests/integration/clientv3/experimental/recipes/v3_queue_test.go index e20932f330a..45d1855b936 100644 --- a/tests/integration/clientv3/experimental/recipes/v3_queue_test.go +++ b/tests/integration/clientv3/experimental/recipes/v3_queue_test.go @@ -31,6 +31,8 @@ const ( // TestQueueOneReaderOneWriter confirms the queue is FIFO func TestQueueOneReaderOneWriter(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -76,6 +78,8 @@ func TestQueueManyReaderManyWriter(t *testing.T) { // BenchmarkQueue benchmarks Queues using many/many readers/writers func BenchmarkQueue(b *testing.B) { + integration.BeforeTest(b) + // XXX switch tests to use TB interface clus := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(nil) @@ -86,6 +90,8 @@ func BenchmarkQueue(b *testing.B) { // TestPrQueueOneReaderOneWriter tests whether priority queues respect priorities. func TestPrQueueOneReaderOneWriter(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -118,6 +124,8 @@ func TestPrQueueOneReaderOneWriter(t *testing.T) { } func TestPrQueueManyReaderManyWriter(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) rqs := newPriorityQueues(clus, manyQueueClients) @@ -127,6 +135,8 @@ func TestPrQueueManyReaderManyWriter(t *testing.T) { // BenchmarkQueue benchmarks Queues using n/n readers/writers func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) { + integration.BeforeTest(b) + // XXX switch tests to use TB interface clus := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(nil) @@ -138,6 +148,7 @@ func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) { } func testQueueNReaderMWriter(t *testing.T, n int, m int) { + integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) testReadersWriters(t, newQueues(clus, n), newQueues(clus, m)) diff --git a/tests/integration/clientv3/mirror_test.go b/tests/integration/clientv3/mirror_test.go index 0516c5bff27..c9246e0f2d3 100644 --- a/tests/integration/clientv3/mirror_test.go +++ b/tests/integration/clientv3/mirror_test.go @@ -72,6 +72,8 @@ func TestMirrorSync(t *testing.T) { } func TestMirrorSyncBase(t *testing.T) { + integration.BeforeTest(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) diff --git a/tests/integration/clientv3/watch_fragment_test.go b/tests/integration/clientv3/watch_fragment_test.go index b554b6b5d76..b012eebb5ed 100644 --- a/tests/integration/clientv3/watch_fragment_test.go +++ b/tests/integration/clientv3/watch_fragment_test.go @@ -64,6 +64,8 @@ func TestWatchFragmentEnableWithGRPCLimit(t *testing.T) { // testWatchFragment triggers watch response that spans over multiple // revisions exceeding server request limits when combined. func testWatchFragment(t *testing.T, fragment, exceedRecvLimit bool) { + integration.BeforeTest(t) + cfg := &integration.ClusterConfig{ Size: 1, MaxRequestBytes: 1.5 * 1024 * 1024, diff --git a/tests/integration/clientv3/watch_test.go b/tests/integration/clientv3/watch_test.go index 9f2b2627d48..aab6b6b1361 100644 --- a/tests/integration/clientv3/watch_test.go +++ b/tests/integration/clientv3/watch_test.go @@ -299,6 +299,8 @@ func TestWatchCancelRunning(t *testing.T) { } func testWatchCancelRunning(t *testing.T, wctx *watchctx) { + integration.BeforeTest(t) + ctx, cancel := context.WithCancel(context.Background()) if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil { t.Fatalf("expected non-nil watcher channel") @@ -583,6 +585,8 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) { } func TestConfigurableWatchProgressNotifyInterval(t *testing.T) { + integration.BeforeTest(t) + progressInterval := 200 * time.Millisecond clus := integration.NewClusterV3(t, &integration.ClusterConfig{ @@ -607,6 +611,8 @@ func TestConfigurableWatchProgressNotifyInterval(t *testing.T) { } func TestWatchRequestProgress(t *testing.T) { + integration.BeforeTest(t) + if integration.ThroughProxy { t.Skipf("grpc-proxy does not support WatchProgress yet") } @@ -682,6 +688,8 @@ func TestWatchRequestProgress(t *testing.T) { } func TestWatchEventType(t *testing.T) { + integration.BeforeTest(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) @@ -886,6 +894,8 @@ func TestWatchWithRequireLeader(t *testing.T) { // TestWatchWithFilter checks that watch filtering works. func TestWatchWithFilter(t *testing.T) { + integration.BeforeTest(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) @@ -923,6 +933,8 @@ func TestWatchWithFilter(t *testing.T) { // TestWatchWithCreatedNotification checks that WithCreatedNotify returns a // Created watch response. func TestWatchWithCreatedNotification(t *testing.T) { + integration.BeforeTest(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) @@ -943,6 +955,8 @@ func TestWatchWithCreatedNotification(t *testing.T) { // a watcher with created notify does not post duplicate // created events from disconnect. func TestWatchWithCreatedNotificationDropConn(t *testing.T) { + integration.BeforeTest(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) @@ -970,6 +984,8 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) { // TestWatchCancelOnServer ensures client watcher cancels propagate back to the server. func TestWatchCancelOnServer(t *testing.T) { + integration.BeforeTest(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) diff --git a/tests/integration/snapshot/v3_snapshot_test.go b/tests/integration/snapshot/v3_snapshot_test.go index f70c6846bf9..3b198792d86 100644 --- a/tests/integration/snapshot/v3_snapshot_test.go +++ b/tests/integration/snapshot/v3_snapshot_test.go @@ -146,8 +146,8 @@ func TestSnapshotV3RestoreMulti(t *testing.T) { // TestCorruptedBackupFileCheck tests if we can correctly identify a corrupted backup file. func TestCorruptedBackupFileCheck(t *testing.T) { + dbPath := integration.MustAbsPath("testdata/corrupted_backup.db") integration.BeforeTest(t) - dbPath := "testdata/corrupted_backup.db" if _, err := os.Stat(dbPath); err != nil { t.Fatalf("test file [%s] does not exist: %v", dbPath, err) } diff --git a/tests/integration/v2store/store_tag_not_v2v3_test.go b/tests/integration/v2store/store_tag_not_v2v3_test.go index 64ec2fbdb1c..6567b25b531 100644 --- a/tests/integration/v2store/store_tag_not_v2v3_test.go +++ b/tests/integration/v2store/store_tag_not_v2v3_test.go @@ -22,6 +22,7 @@ import ( "go.etcd.io/etcd/pkg/v3/testutil" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/tests/v3/integration" ) type v2TestStore struct { @@ -31,6 +32,7 @@ type v2TestStore struct { func (s *v2TestStore) Close() {} func newTestStore(t *testing.T, ns ...string) StoreCloser { + integration.BeforeTest(t) if len(ns) == 0 { t.Logf("new v2 store with no namespace") } diff --git a/tests/integration/v2store/store_tag_v2v3_test.go b/tests/integration/v2store/store_tag_v2v3_test.go index a9dafe664c1..5a9d17b05dd 100644 --- a/tests/integration/v2store/store_tag_v2v3_test.go +++ b/tests/integration/v2store/store_tag_v2v3_test.go @@ -42,6 +42,7 @@ type v2v3TestStore struct { func (s *v2v3TestStore) Close() { s.clus.Terminate(s.t) } func newTestStore(t *testing.T, ns ...string) StoreCloser { + integration.BeforeTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) return &v2v3TestStore{ v2v3.NewStore(clus.Client(0), "/v2/"), diff --git a/tests/integration/v2store/store_v2v3_test.go b/tests/integration/v2store/store_v2v3_test.go index 5a186389b16..9d2c799d5f3 100644 --- a/tests/integration/v2store/store_v2v3_test.go +++ b/tests/integration/v2store/store_v2v3_test.go @@ -19,7 +19,6 @@ import ( "testing" "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/pkg/v3/testutil" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v2v3" "go.etcd.io/etcd/tests/v3/integration" @@ -28,18 +27,18 @@ import ( // TODO: fix tests func runWithCluster(t testing.TB, runner func(testing.TB, []string)) { - testutil.BeforeTest(t) + integration.BeforeTest(t) cfg := integration.ClusterConfig{Size: 1} clus := integration.NewClusterV3(t, &cfg) defer clus.Terminate(t) endpoints := []string{clus.Client(0).Endpoints()[0]} runner(t, endpoints) - } func TestCreateKV(t *testing.T) { runWithCluster(t, testCreateKV) } func testCreateKV(t testing.TB, endpoints []string) { + integration.BeforeTest(t) testCases := []struct { key string value string @@ -132,6 +131,7 @@ func testSetKV(t testing.TB, endpoints []string) { func TestCreateSetDir(t *testing.T) { runWithCluster(t, testCreateSetDir) } func testCreateSetDir(t testing.TB, endpoints []string) { + integration.BeforeTest(t) testCases := []struct { dir string }{ diff --git a/tests/integration/v3_alarm_test.go b/tests/integration/v3_alarm_test.go index b60c7ec7fbe..e85cbc51ce4 100644 --- a/tests/integration/v3_alarm_test.go +++ b/tests/integration/v3_alarm_test.go @@ -24,7 +24,6 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - "go.etcd.io/etcd/pkg/v3/testutil" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc" @@ -35,7 +34,7 @@ import ( // TestV3StorageQuotaApply tests the V3 server respects quotas during apply func TestV3StorageQuotaApply(t *testing.T) { - testutil.BeforeTest(t) + BeforeTest(t) quotasize := int64(16 * os.Getpagesize()) clus := NewClusterV3(t, &ClusterConfig{Size: 2}) @@ -115,6 +114,8 @@ func TestV3StorageQuotaApply(t *testing.T) { // TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through. func TestV3AlarmDeactivate(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) kvc := toGRPC(clus.RandClient()).KV diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 717400e4926..b7c8508a498 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -34,6 +34,8 @@ import ( // to the primary lessor, refresh the leases and start to manage leases. // TODO: use customized clock to make this test go faster? func TestV3LeasePromote(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -227,6 +229,8 @@ func TestV3LeaseKeepAlive(t *testing.T) { // TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted // across leader elections. func TestV3LeaseCheckpoint(t *testing.T) { + BeforeTest(t) + var ttl int64 = 300 leaseInterval := 2 * time.Second BeforeTest(t) @@ -644,6 +648,8 @@ const fiveMinTTL int64 = 300 // TestV3LeaseRecoverAndRevoke ensures that revoking a lease after restart deletes the attached key. func TestV3LeaseRecoverAndRevoke(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -693,6 +699,8 @@ func TestV3LeaseRecoverAndRevoke(t *testing.T) { // TestV3LeaseRevokeAndRecover ensures that revoked key stays deleted after restart. func TestV3LeaseRevokeAndRecover(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -743,6 +751,8 @@ func TestV3LeaseRevokeAndRecover(t *testing.T) { // TestV3LeaseRecoverKeyWithDetachedLease ensures that revoking a detached lease after restart // does not delete the key. func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -797,6 +807,8 @@ func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) { } func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) diff --git a/tests/integration/v3_stm_test.go b/tests/integration/v3_stm_test.go index dc3085b8366..c5a9e41c356 100644 --- a/tests/integration/v3_stm_test.go +++ b/tests/integration/v3_stm_test.go @@ -28,6 +28,8 @@ import ( // TestSTMConflict tests that conflicts are retried. func TestSTMConflict(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -94,6 +96,8 @@ func TestSTMConflict(t *testing.T) { // TestSTMPutNewKey confirms a STM put on a new key is visible after commit. func TestSTMPutNewKey(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -119,6 +123,8 @@ func TestSTMPutNewKey(t *testing.T) { // TestSTMAbort tests that an aborted txn does not modify any keys. func TestSTMAbort(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -148,6 +154,8 @@ func TestSTMAbort(t *testing.T) { // TestSTMSerialize tests that serialization is honored when serializable. func TestSTMSerialize(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -209,6 +217,8 @@ func TestSTMSerialize(t *testing.T) { // TestSTMApplyOnConcurrentDeletion ensures that concurrent key deletion // fails the first GET revision comparison within STM; trigger retry. func TestSTMApplyOnConcurrentDeletion(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -256,6 +266,8 @@ func TestSTMApplyOnConcurrentDeletion(t *testing.T) { } func TestSTMSerializableSnapshotPut(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index a7dc0b504d7..a07dd138c16 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -50,6 +50,8 @@ func MustFetchNotEmptyMetric(tb testing.TB, member *member, metric string, timeo // that were created in synced watcher group in the first place. // TODO: fix panic with gRPC proxy "panic: watcher current revision should not exceed current revision" func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{ Size: 3, SnapshotCount: 10, diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 3688e5c90a2..240af36f4bf 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1074,6 +1074,8 @@ func TestV3WatchClose(t *testing.T) { // TestV3WatchWithFilter ensures watcher filters out the events correctly. func TestV3WatchWithFilter(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -1215,6 +1217,8 @@ func TestV3WatchWithPrevKV(t *testing.T) { // TestV3WatchCancellation ensures that watch cancellation frees up server resources. func TestV3WatchCancellation(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) @@ -1255,6 +1259,8 @@ func TestV3WatchCancellation(t *testing.T) { // TestV3WatchCloseCancelRace ensures that watch close doesn't decrement the watcher total too far. func TestV3WatchCloseCancelRace(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) From 6657d5907ca5cb9913abd30bcc7da777313ea9f3 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 16 Mar 2021 23:30:23 +0100 Subject: [PATCH 4/4] Make sure all integration tests have BeforeTest. The CL disallows to create NewCluster in tests without BeforeTest. --- tests/integration/cluster.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index 74e54ca8b93..d55b6e38b9a 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -1272,6 +1272,15 @@ func NewClusterV3(t testutil.TB, cfg *ClusterConfig) *ClusterV3 { t.Helper() testutil.SkipTestIfShortMode(t, "Cannot create clusters in --short tests") + wd, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + if !strings.HasPrefix(wd, os.TempDir()) { + t.Errorf("Working directory '%s' expected to be in temp-dir ('%s')."+ + "Have you executed integration.BeforeTest(t) ?", wd, os.TempDir()) + } + cfg.UseGRPC = true if os.Getenv("CLIENT_DEBUG") != "" { clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))