Skip to content

Commit

Permalink
Merge pull request #12782 from ptabor/20210316-fixes
Browse files Browse the repository at this point in the history
Integration: Test flakiness fixes
  • Loading branch information
ptabor authored Mar 19, 2021
2 parents 18321a0 + 6657d59 commit 8469108
Show file tree
Hide file tree
Showing 20 changed files with 112 additions and 19 deletions.
6 changes: 3 additions & 3 deletions etcdctl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,17 @@ func (s *v3Manager) saveDB() error {
// update consistentIndex so applies go through on etcdserver despite
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)
defer be.Close()

ci := cindex.NewConsistentIndex(be.BatchTx())
ci.SetConsistentIndex(uint64(commit))

// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci)
defer lessor.Stop()

mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
defer mvs.Close()
txn := mvs.Write(traceutil.TODO())
btx := be.BatchTx()
del := func(k, v []byte) error {
Expand All @@ -380,9 +383,6 @@ func (s *v3Manager) saveDB() error {
txn.End()

mvs.Commit()
mvs.Close()
be.Close()

return nil
}

Expand Down
12 changes: 4 additions & 8 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,18 +386,14 @@ func (e *Etcd) Close() {
}

func stopServers(ctx context.Context, ss *servers) {
shutdownNow := func() {
// first, close the http.Server
ss.http.Shutdown(ctx)
// then close grpc.Server; cancels all active RPCs
ss.grpc.Stop()
}
// first, close the http.Server
ss.http.Shutdown(ctx)

// do not grpc.Server.GracefulStop with TLS enabled etcd server
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// and https://github.com/etcd-io/etcd/issues/8916
if ss.secure {
shutdownNow()
ss.grpc.Stop()
return
}

Expand All @@ -415,7 +411,7 @@ func stopServers(ctx context.Context, ss *servers) {
case <-ctx.Done():
// took too long, manually close open transports
// e.g. watch streams
shutdownNow()
ss.grpc.Stop()

// concurrent GracefulStop should be interrupted
<-ch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
clientv3test.MustWaitPinReady(t, watchCli)
t.Logf("successful connection with server: %v", target)

// add all eps to list, so that when the original pined one fails
// the client can switch to other available eps
watchCli.SetEndpoints(eps...)
// We stick to the original endpoint, so when the one fails we don't switch
// under the cover to other available eps, but expose the failure to the
// caller (test assertion).

wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify())
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
)

func TestDoubleBarrier(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand Down Expand Up @@ -96,6 +98,8 @@ func TestDoubleBarrier(t *testing.T) {
}

func TestDoubleBarrierFailover(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand Down
13 changes: 13 additions & 0 deletions tests/integration/clientv3/experimental/recipes/v3_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
)

func TestMutexLockSingleNode(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand All @@ -37,6 +39,8 @@ func TestMutexLockSingleNode(t *testing.T) {
}

func TestMutexLockMultiNode(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand Down Expand Up @@ -89,6 +93,7 @@ func testMutexLock(t *testing.T, waiters int, chooseClient func() *clientv3.Clie
}

func TestMutexTryLockSingleNode(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand All @@ -98,6 +103,7 @@ func TestMutexTryLockSingleNode(t *testing.T) {
}

func TestMutexTryLockMultiNode(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand All @@ -107,6 +113,8 @@ func TestMutexTryLockMultiNode(t *testing.T) {
}

func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.Client) {
integration.BeforeTest(t)

lockedC := make(chan *concurrency.Mutex)
notlockedC := make(chan *concurrency.Mutex)
stopC := make(chan struct{})
Expand Down Expand Up @@ -155,6 +163,8 @@ func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.C
// TestMutexSessionRelock ensures that acquiring the same lock with the same
// session will not result in deadlock.
func TestMutexSessionRelock(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
session, err := concurrency.NewSession(clus.RandClient())
Expand Down Expand Up @@ -285,6 +295,7 @@ func TestMutexWaitsOnCurrentHolder(t *testing.T) {
}

func BenchmarkMutex4Waiters(b *testing.B) {
integration.BeforeTest(b)
// XXX switch tests to use TB interface
clus := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(nil)
Expand All @@ -294,12 +305,14 @@ func BenchmarkMutex4Waiters(b *testing.B) {
}

func TestRWMutexSingleNode(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
testRWMutex(t, 5, func() *clientv3.Client { return clus.Client(0) })
}

func TestRWMutexMultiNode(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
testRWMutex(t, 5, func() *clientv3.Client { return clus.RandClient() })
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/clientv3/experimental/recipes/v3_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (

// TestQueueOneReaderOneWriter confirms the queue is FIFO
func TestQueueOneReaderOneWriter(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

Expand Down Expand Up @@ -76,6 +78,8 @@ func TestQueueManyReaderManyWriter(t *testing.T) {

// BenchmarkQueue benchmarks Queues using many/many readers/writers
func BenchmarkQueue(b *testing.B) {
integration.BeforeTest(b)

// XXX switch tests to use TB interface
clus := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(nil)
Expand All @@ -86,6 +90,8 @@ func BenchmarkQueue(b *testing.B) {

// TestPrQueueOneReaderOneWriter tests whether priority queues respect priorities.
func TestPrQueueOneReaderOneWriter(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

Expand Down Expand Up @@ -118,6 +124,8 @@ func TestPrQueueOneReaderOneWriter(t *testing.T) {
}

func TestPrQueueManyReaderManyWriter(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
rqs := newPriorityQueues(clus, manyQueueClients)
Expand All @@ -127,6 +135,8 @@ func TestPrQueueManyReaderManyWriter(t *testing.T) {

// BenchmarkQueue benchmarks Queues using n/n readers/writers
func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
integration.BeforeTest(b)

// XXX switch tests to use TB interface
clus := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(nil)
Expand All @@ -138,6 +148,7 @@ func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) {
}

func testQueueNReaderMWriter(t *testing.T, n int, m int) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
testReadersWriters(t, newQueues(clus, n), newQueues(clus, m))
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/clientv3/mirror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func TestMirrorSync(t *testing.T) {
}

func TestMirrorSyncBase(t *testing.T) {
integration.BeforeTest(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

Expand Down
2 changes: 2 additions & 0 deletions tests/integration/clientv3/watch_fragment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestWatchFragmentEnableWithGRPCLimit(t *testing.T) {
// testWatchFragment triggers watch response that spans over multiple
// revisions exceeding server request limits when combined.
func testWatchFragment(t *testing.T, fragment, exceedRecvLimit bool) {
integration.BeforeTest(t)

cfg := &integration.ClusterConfig{
Size: 1,
MaxRequestBytes: 1.5 * 1024 * 1024,
Expand Down
16 changes: 16 additions & 0 deletions tests/integration/clientv3/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ func TestWatchCancelRunning(t *testing.T) {
}

func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
integration.BeforeTest(t)

ctx, cancel := context.WithCancel(context.Background())
if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
t.Fatalf("expected non-nil watcher channel")
Expand Down Expand Up @@ -583,6 +585,8 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
}

func TestConfigurableWatchProgressNotifyInterval(t *testing.T) {
integration.BeforeTest(t)

progressInterval := 200 * time.Millisecond
clus := integration.NewClusterV3(t,
&integration.ClusterConfig{
Expand All @@ -607,6 +611,8 @@ func TestConfigurableWatchProgressNotifyInterval(t *testing.T) {
}

func TestWatchRequestProgress(t *testing.T) {
integration.BeforeTest(t)

if integration.ThroughProxy {
t.Skipf("grpc-proxy does not support WatchProgress yet")
}
Expand Down Expand Up @@ -682,6 +688,8 @@ func TestWatchRequestProgress(t *testing.T) {
}

func TestWatchEventType(t *testing.T) {
integration.BeforeTest(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

Expand Down Expand Up @@ -886,6 +894,8 @@ func TestWatchWithRequireLeader(t *testing.T) {

// TestWatchWithFilter checks that watch filtering works.
func TestWatchWithFilter(t *testing.T) {
integration.BeforeTest(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

Expand Down Expand Up @@ -923,6 +933,8 @@ func TestWatchWithFilter(t *testing.T) {
// TestWatchWithCreatedNotification checks that WithCreatedNotify returns a
// Created watch response.
func TestWatchWithCreatedNotification(t *testing.T) {
integration.BeforeTest(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

Expand All @@ -943,6 +955,8 @@ func TestWatchWithCreatedNotification(t *testing.T) {
// a watcher with created notify does not post duplicate
// created events from disconnect.
func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
integration.BeforeTest(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

Expand Down Expand Up @@ -970,6 +984,8 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) {

// TestWatchCancelOnServer ensures client watcher cancels propagate back to the server.
func TestWatchCancelOnServer(t *testing.T) {
integration.BeforeTest(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

Expand Down
9 changes: 9 additions & 0 deletions tests/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,15 @@ func NewClusterV3(t testutil.TB, cfg *ClusterConfig) *ClusterV3 {
t.Helper()
testutil.SkipTestIfShortMode(t, "Cannot create clusters in --short tests")

wd, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
if !strings.HasPrefix(wd, os.TempDir()) {
t.Errorf("Working directory '%s' expected to be in temp-dir ('%s')."+
"Have you executed integration.BeforeTest(t) ?", wd, os.TempDir())
}

cfg.UseGRPC = true

clus := &ClusterV3{
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/snapshot/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
// can boot into the same cluster after being restored from a same
// snapshot file, and also be able to add another member to the cluster.
func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) {
integration.BeforeTest(t)

kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
dbPath := createSnapshotFile(t, kvs)

Expand Down
2 changes: 2 additions & 0 deletions tests/integration/snapshot/v3_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
// TestSnapshotV3RestoreSingle tests single node cluster restoring
// from a snapshot file.
func TestSnapshotV3RestoreSingle(t *testing.T) {
integration.BeforeTest(t)
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
dbPath := createSnapshotFile(t, kvs)
defer os.RemoveAll(dbPath)
Expand Down Expand Up @@ -103,6 +104,7 @@ func TestSnapshotV3RestoreSingle(t *testing.T) {
// can boot into the same cluster after being restored from a same
// snapshot file.
func TestSnapshotV3RestoreMulti(t *testing.T) {
integration.BeforeTest(t)
kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
dbPath := createSnapshotFile(t, kvs)

Expand Down
2 changes: 2 additions & 0 deletions tests/integration/v2store/store_tag_not_v2v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.etcd.io/etcd/pkg/v3/testutil"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/tests/v3/integration"
)

type v2TestStore struct {
Expand All @@ -31,6 +32,7 @@ type v2TestStore struct {
func (s *v2TestStore) Close() {}

func newTestStore(t *testing.T, ns ...string) StoreCloser {
integration.BeforeTest(t)
if len(ns) == 0 {
t.Logf("new v2 store with no namespace")
}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/v2store/store_tag_v2v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type v2v3TestStore struct {
func (s *v2v3TestStore) Close() { s.clus.Terminate(s.t) }

func newTestStore(t *testing.T, ns ...string) StoreCloser {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
return &v2v3TestStore{
v2v3.NewStore(clus.Client(0), "/v2/"),
Expand Down
Loading

0 comments on commit 8469108

Please sign in to comment.