Skip to content

Commit

Permalink
tests: Reproduce issue 13766
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Nov 4, 2022
1 parent 38a571a commit 4ad9cb1
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 30 deletions.
16 changes: 14 additions & 2 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/url"
"os"
"strings"
"syscall"
"testing"
"time"
Expand All @@ -30,7 +31,9 @@ import (
)

var (
EtcdServerReadyLines = []string{"ready to serve client requests"}
EtcdServerReadyLines = []string{EtcdServerReadyLog}
EtcdServerReadyLog = "ready to serve client requests"
EtcdServerInitialCorruptCheckFailureLog = "checkInitialHashKV failed"
)

// EtcdProcess is a process that serves etcd requests.
Expand Down Expand Up @@ -165,7 +168,16 @@ func (ep *EtcdServerProcess) Close() error {

func (ep *EtcdServerProcess) waitReady(ctx context.Context) error {
defer close(ep.donec)
return WaitReadyExpectProc(ctx, ep.proc, EtcdServerReadyLines)
err := WaitReadyExpectProc(ctx, ep.proc, []string{EtcdServerReadyLog, EtcdServerInitialCorruptCheckFailureLog})
if err != nil {
return err
}
for _, log := range ep.proc.Lines() {
if strings.Contains(log, EtcdServerInitialCorruptCheckFailureLog) {
return fmt.Errorf("initial check failed: %s", log)
}
}
return nil
}

func (ep *EtcdServerProcess) Config() *EtcdServerProcessConfig { return ep.cfg }
Expand Down
76 changes: 48 additions & 28 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package linearizability

import (
"context"
"fmt"
"path/filepath"
"strings"
"sync"
Expand All @@ -29,10 +28,6 @@ import (
)

const (
// minimalQPS is used to validate if enough traffic is send to make tests accurate.
minimalQPS = 100.0
// maximalQPS limits number of requests send to etcd to avoid linearizability analysis taking too long.
maximalQPS = 200.0
// failpointTriggersCount
failpointTriggersCount = 60
// waitBetweenFailpointTriggers
Expand All @@ -42,23 +37,37 @@ const (
func TestLinearizability(t *testing.T) {
testRunner.BeforeTest(t)
tcs := []struct {
name string
failpoint Failpoint
config e2e.EtcdProcessClusterConfig
name string
failpoint Failpoint
config e2e.EtcdProcessClusterConfig
traffic trafficConfig
skipValidation bool
}{
{
name: "KillClusterOfSize1",
failpoint: KillFailpoint,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
},
traffic: trafficConfig{
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
traffic: PutGetTraffic,
},
},
{
name: "KillClusterOfSize3",
failpoint: KillFailpoint,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
},
traffic: trafficConfig{
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
traffic: PutGetTraffic,
},
},
{
name: "Issue14370",
Expand All @@ -67,6 +76,27 @@ func TestLinearizability(t *testing.T) {
ClusterSize: 1,
GoFailEnabled: true,
},
traffic: trafficConfig{
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
traffic: PutGetTraffic,
},
},
{
name: "Issue13766",
failpoint: KillFailpoint,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
InitialCorruptCheck: true,
},
skipValidation: true,
traffic: trafficConfig{
minimalQPS: 2000,
maximalQPS: 4000,
clientCount: 100,
traffic: PutTraffic,
},
},
}
for _, tc := range tcs {
Expand All @@ -76,18 +106,15 @@ func TestLinearizability(t *testing.T) {
count: failpointTriggersCount,
waitBetweenTriggers: waitBetweenFailpointTriggers,
}
traffic := trafficConfig{
minimalQPS: minimalQPS,
maximalQPS: maximalQPS,
clientCount: 8,
traffic: PutGetTraffic,
operations := runClusterAndInjectFailures(context.Background(), t, tc.config, failpoint, tc.traffic)
if !tc.skipValidation {
validateLinearizability(t, operations)
}
testLinearizability(context.Background(), t, tc.config, failpoint, traffic)
})
}
}

func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) {
func runClusterAndInjectFailures(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) []porcupine.Operation {
clus, err := e2e.NewEtcdProcessCluster(ctx, t, &config)
if err != nil {
t.Fatal(err)
Expand All @@ -101,9 +128,10 @@ func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProce
t.Error(err)
}
}()
operations := simulateTraffic(ctx, t, clus, traffic)
clus.Close()
return simulateTraffic(ctx, t, clus, traffic)
}

func validateLinearizability(t *testing.T, operations []porcupine.Operation) {
linearizable, info := porcupine.CheckOperationsVerbose(etcdModel, operations, 0)
if linearizable != porcupine.Ok {
t.Error("Model is not linearizable")
Expand All @@ -121,22 +149,14 @@ func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProce
}

func triggerFailpoints(ctx context.Context, clus *e2e.EtcdProcessCluster, config FailpointConfig) error {
var err error
successes := 0
failures := 0
time.Sleep(config.waitBetweenTriggers)
for successes < config.count && failures < config.count {
err = config.failpoint.Trigger(ctx, clus)
for i := 0; i < config.count; i++ {
err := config.failpoint.Trigger(ctx, clus)
if err != nil {
failures++
continue
return err
}
successes++
time.Sleep(config.waitBetweenTriggers)
}
if successes < config.count || failures >= config.count {
return fmt.Errorf("failed to trigger failpoints enough times, err: %v", err)
}
return nil
}

Expand Down
27 changes: 27 additions & 0 deletions tests/linearizability/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

var (
PutGetTraffic Traffic = putGetTraffic{}
PutTraffic Traffic = putTraffic{}
)

type Traffic interface {
Expand Down Expand Up @@ -62,3 +63,29 @@ func (t putGetTraffic) Run(ctx context.Context, c *recordingClient, limiter *rat
}
return
}

type putTraffic struct{}

func (t putTraffic) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) {
maxOperationsPerClient := 1000000
id := maxOperationsPerClient * c.id
key := "key"

for i := 0; i < maxOperationsPerClient; {
select {
case <-ctx.Done():
return
default:
}
putData := fmt.Sprintf("%d", id+i)
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
err := c.Put(putCtx, key, putData)
cancel()
if err != nil {
continue
}
limiter.Wait(ctx)
i++
}
return
}

0 comments on commit 4ad9cb1

Please sign in to comment.