Skip to content

Commit

Permalink
Utilize WAL to patch operation history
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Oct 16, 2023
1 parent f23e34a commit 1e1a281
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 459 deletions.
26 changes: 15 additions & 11 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,18 @@ type testScenario struct {
}

func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) {
report := report.TestReport{Logger: lg}
r := report.TestReport{Logger: lg}
var err error
report.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
r.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
if err != nil {
t.Fatal(err)
}
defer report.Cluster.Close()
defer r.Cluster.Close()

if s.failpoint == nil {
s.failpoint = pickRandomFailpoint(t, report.Cluster)
s.failpoint = pickRandomFailpoint(t, r.Cluster)
} else {
err = validateFailpoint(report.Cluster, s.failpoint)
err = validateFailpoint(r.Cluster, s.failpoint)
if err != nil {
t.Fatal(err)
}
Expand All @@ -200,15 +200,19 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
// Refer to: https://github.com/golang/go/issues/49929
panicked := true
defer func() {
report.Report(t, panicked)
r.Report(t, panicked)
}()
report.Client = s.run(ctx, t, lg, report.Cluster)
forcestopCluster(report.Cluster)
r.Client = s.run(ctx, t, lg, r.Cluster)
forcestopCluster(r.Cluster)

watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
persistedRequests, err := report.PersistedRequestsCluster(lg, r.Cluster)
if err != nil {
t.Error(err)
}
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client, 5*time.Minute)
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)

panicked = false
}
Expand Down
188 changes: 188 additions & 0 deletions tests/robustness/report/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package report

import (
"errors"
"fmt"
"github.com/google/go-cmp/cmp"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/raft/v3/raftpb"
"go.uber.org/zap"
"io"
"os"
"path/filepath"
"strings"
)

func LoadClusterPersistedRequests(lg *zap.Logger, path string) ([]model.EtcdRequest, error) {
files, err := os.ReadDir(path)
if err != nil {
return nil, err
}
dataDirs := []string{}
for _, file := range files {
if file.IsDir() && strings.HasPrefix(file.Name(), "server-") {
dataDirs = append(dataDirs, filepath.Join(path, file.Name()))
}
}
return PersistedRequestsDirs(lg, dataDirs)
}

func PersistedRequestsCluster(lg *zap.Logger, cluster *e2e.EtcdProcessCluster) ([]model.EtcdRequest, error) {
dataDirs := []string{}
for _, proc := range cluster.Procs {
dataDirs = append(dataDirs, proc.Config().DataDirPath)
}
return PersistedRequestsDirs(lg, dataDirs)
}

func PersistedRequestsDirs(lg *zap.Logger, dataDirs []string) ([]model.EtcdRequest, error) {
persistedRequests := []model.EtcdRequest{}
for _, dir := range dataDirs {
memberRequests, err := requestsPersistedInWAL(lg, dir)
if err != nil {
panic(err)
}
minLength := min(len(persistedRequests), len(memberRequests))
if diff := cmp.Diff(memberRequests[:minLength], persistedRequests[:minLength]); diff != "" {
return nil, fmt.Errorf("unexpected differences between wal entries, diff:\n%s", diff)
}
if len(memberRequests) > len(persistedRequests) {
persistedRequests = memberRequests
}
}
return persistedRequests, nil
}

func requestsPersistedInWAL(lg *zap.Logger, dataDir string) ([]model.EtcdRequest, error) {
state, ents, err := ReadWAL(lg, dataDir)
if err != nil {
return nil, err
}
requests := make([]model.EtcdRequest, 0, len(ents))
for _, ent := range ents {
if ent.Type != raftpb.EntryNormal || len(ent.Data) == 0 {
continue
}
if ent.Index > state.Commit {
break
}
request, err := parseEntryNormal(ent)
if err != nil {
return nil, err
}
if request != nil {
requests = append(requests, *request)
}
}
return requests, nil
}

func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raftpb.Entry, err error) {
walDir := filepath.Join(dataDir, "member", "wal")
repaired := false
for {
w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0})
if err != nil {
lg.Fatal("failed to open WAL", zap.Error(err))
}
_, state, ents, err = w.ReadAll()
w.Close()
if err != nil {
if errors.Is(err, wal.ErrSnapshotNotFound) {
return state, ents, nil
}
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
return state, nil, fmt.Errorf("failed to read WAL, cannot be repaired, err: %s", err)
}
if !wal.Repair(lg, walDir) {
return state, nil, fmt.Errorf("failed to repair WAL, err: %s", err)
} else {
lg.Info("repaired WAL", zap.Error(err))
repaired = true
}
continue
}
return state, ents, nil
}
}

func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
var raftReq pb.InternalRaftRequest
if err := raftReq.Unmarshal(ent.Data); err != nil {
return nil, err
}
switch {
case raftReq.Put != nil:
op := model.PutOptions{Key: string(raftReq.Put.Key), Value: model.ToValueOrHash(string(raftReq.Put.Value))}
request := model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{Type: model.PutOperation, Put: op},
},
},
}
return &request, nil
case raftReq.ClusterMemberAttrSet != nil:
return nil, nil
case raftReq.ClusterVersionSet != nil:
return nil, nil
case raftReq.Compaction != nil:
return nil, nil
case raftReq.Txn != nil:
txn := model.TxnRequest{}
if raftReq.Txn.Failure != nil {
panic("not empty failure operation")
}
if raftReq.Txn.Compare != nil {
panic("not empty conditions")
}
for _, op := range raftReq.Txn.Success {
var operation model.EtcdOperation
switch {
case op.GetRequestRange() != nil:
rangeOp := op.GetRequestRange()
operation = model.EtcdOperation{
Type: model.RangeOperation,
Range: model.RangeOptions{
Start: string(rangeOp.Key),
End: string(rangeOp.RangeEnd),
Limit: rangeOp.Limit,
},
}
case op.GetRequestPut() != nil:
putOp := op.GetRequestPut()
operation = model.EtcdOperation{
Type: model.PutOperation,
Put: model.PutOptions{
Key: string(putOp.Key),
Value: model.ToValueOrHash(string(putOp.Value)),
},
}
case op.GetRequestDeleteRange() != nil:
deleteOp := op.GetRequestDeleteRange()
operation = model.EtcdOperation{
Type: model.DeleteOperation,
Delete: model.DeleteOptions{
Key: string(deleteOp.Key),
},
}
default:
panic(fmt.Sprintf("Unknown op type %v", op))
}
txn.OperationsOnSuccess = append(txn.OperationsOnSuccess, operation)
}
request := model.EtcdRequest{
Type: model.Txn,
Txn: &txn,
}
return &request, nil
default:
panic(fmt.Sprintf("Unhandled raft request: %+v", raftReq))
}
}
Binary file not shown.
Loading

0 comments on commit 1e1a281

Please sign in to comment.