Skip to content

Commit

Permalink
Utilize WAL to patch operation history
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Apr 7, 2024
1 parent 10657ef commit 86dc8a6
Show file tree
Hide file tree
Showing 6 changed files with 519 additions and 116 deletions.
26 changes: 15 additions & 11 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ func TestRobustnessRegression(t *testing.T) {
}

func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) {
report := report.TestReport{Logger: lg}
r := report.TestReport{Logger: lg}
var err error
report.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
r.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
if err != nil {
t.Fatal(err)
}
defer report.Cluster.Close()
defer r.Cluster.Close()

if s.failpoint == nil {
s.failpoint = failpoint.PickRandom(t, report.Cluster)
s.failpoint = failpoint.PickRandom(t, r.Cluster)
} else {
err = failpoint.Validate(report.Cluster, s.failpoint)
err = failpoint.Validate(r.Cluster, s.failpoint)
if err != nil {
t.Fatal(err)
}
Expand All @@ -86,15 +86,19 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
// Refer to: https://github.com/golang/go/issues/49929
panicked := true
defer func() {
report.Report(t, panicked)
r.Report(t, panicked)
}()
report.Client = s.run(ctx, t, lg, report.Cluster)
forcestopCluster(report.Cluster)
r.Client = s.run(ctx, t, lg, r.Cluster)
persistedRequests, err := report.PersistedRequestsCluster(lg, r.Cluster)
if err != nil {
t.Error(err)
}
forcestopCluster(r.Cluster)

watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client, 5*time.Minute)
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)

panicked = false
}
Expand Down
237 changes: 237 additions & 0 deletions tests/robustness/report/wal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package report

import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"

"github.com/google/go-cmp/cmp"
"go.uber.org/zap"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/raft/v3/raftpb"
)

func LoadClusterPersistedRequests(lg *zap.Logger, path string) ([]model.EtcdRequest, error) {
files, err := os.ReadDir(path)
if err != nil {
return nil, err
}
dataDirs := []string{}
for _, file := range files {
if file.IsDir() && strings.HasPrefix(file.Name(), "server-") {
dataDirs = append(dataDirs, filepath.Join(path, file.Name()))
}
}
return PersistedRequestsDirs(lg, dataDirs)
}

func PersistedRequestsCluster(lg *zap.Logger, cluster *e2e.EtcdProcessCluster) ([]model.EtcdRequest, error) {
dataDirs := []string{}
for _, proc := range cluster.Procs {
dataDirs = append(dataDirs, proc.Config().DataDirPath)
}
return PersistedRequestsDirs(lg, dataDirs)
}

func PersistedRequestsDirs(lg *zap.Logger, dataDirs []string) ([]model.EtcdRequest, error) {
persistedRequests := []model.EtcdRequest{}
allowedFailures := len(dataDirs) / 2
for _, dir := range dataDirs {
memberRequests, err := requestsPersistedInWAL(lg, dir)
if err != nil {
if allowedFailures < 1 {
return nil, err
}
allowedFailures--
continue
}
minLength := min(len(persistedRequests), len(memberRequests))
if diff := cmp.Diff(memberRequests[:minLength], persistedRequests[:minLength]); diff != "" {
return nil, fmt.Errorf("unexpected differences between wal entries, diff:\n%s", diff)
}
if len(memberRequests) > len(persistedRequests) {
persistedRequests = memberRequests
}
}
return persistedRequests, nil
}

func requestsPersistedInWAL(lg *zap.Logger, dataDir string) ([]model.EtcdRequest, error) {
_, ents, err := ReadWAL(lg, dataDir)
if err != nil {
return nil, err
}
requests := make([]model.EtcdRequest, 0, len(ents))
for _, ent := range ents {
if ent.Type != raftpb.EntryNormal || len(ent.Data) == 0 {
continue
}
request, err := parseEntryNormal(ent)
if err != nil {
return nil, err
}
if request != nil {
requests = append(requests, *request)
}
}
return requests, nil
}

func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raftpb.Entry, err error) {
walDir := filepath.Join(dataDir, "member", "wal")
repaired := false
for {
w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0})
if err != nil {
lg.Fatal("failed to open WAL", zap.Error(err))
}
_, state, ents, err = w.ReadAll()
w.Close()
if err != nil {
if errors.Is(err, wal.ErrSnapshotNotFound) {
return state, ents, nil
}
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
return state, nil, fmt.Errorf("failed to read WAL, cannot be repaired, err: %s", err)
}
if !wal.Repair(lg, walDir) {
return state, nil, fmt.Errorf("failed to repair WAL, err: %s", err)
}
lg.Info("repaired WAL", zap.Error(err))
repaired = true
continue
}
return state, ents, nil
}
}

func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
var raftReq pb.InternalRaftRequest
if err := raftReq.Unmarshal(ent.Data); err != nil {
var r pb.Request
isV2Entry := pbutil.MaybeUnmarshal(&r, ent.Data)
if !isV2Entry {
return nil, err
}
return nil, nil
}
switch {
case raftReq.Put != nil:
op := model.PutOptions{
Key: string(raftReq.Put.Key),
Value: model.ToValueOrHash(string(raftReq.Put.Value)),
LeaseID: raftReq.Put.Lease,
}
request := model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{Type: model.PutOperation, Put: op},
},
},
}
return &request, nil
case raftReq.DeleteRange != nil:
op := model.DeleteOptions{Key: string(raftReq.DeleteRange.Key)}
request := model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{Type: model.DeleteOperation, Delete: op},
},
},
}
return &request, nil
case raftReq.LeaseGrant != nil:
return &model.EtcdRequest{
Type: model.LeaseGrant,
LeaseGrant: &model.LeaseGrantRequest{LeaseID: raftReq.LeaseGrant.ID},
}, nil
case raftReq.ClusterMemberAttrSet != nil:
return nil, nil
case raftReq.ClusterVersionSet != nil:
return nil, nil
case raftReq.Compaction != nil:
return nil, nil
case raftReq.Txn != nil:
txn := model.TxnRequest{}
for _, cmp := range raftReq.Txn.Compare {
txn.Conditions = append(txn.Conditions, model.EtcdCondition{
Key: string(cmp.Key),
ExpectedRevision: cmp.GetModRevision(),
})
}
for _, op := range raftReq.Txn.Success {
txn.OperationsOnSuccess = append(txn.OperationsOnSuccess, toEtcdOperation(op))
}
for _, op := range raftReq.Txn.Failure {
txn.OperationsOnFailure = append(txn.OperationsOnFailure, toEtcdOperation(op))
}
request := model.EtcdRequest{
Type: model.Txn,
Txn: &txn,
}
return &request, nil
default:
panic(fmt.Sprintf("Unhandled raft request: %+v", raftReq))
}
}

func toEtcdOperation(op *pb.RequestOp) (operation model.EtcdOperation) {
switch {
case op.GetRequestRange() != nil:
rangeOp := op.GetRequestRange()
operation = model.EtcdOperation{
Type: model.RangeOperation,
Range: model.RangeOptions{
Start: string(rangeOp.Key),
End: string(rangeOp.RangeEnd),
Limit: rangeOp.Limit,
},
}
case op.GetRequestPut() != nil:
putOp := op.GetRequestPut()
operation = model.EtcdOperation{
Type: model.PutOperation,
Put: model.PutOptions{
Key: string(putOp.Key),
Value: model.ToValueOrHash(string(putOp.Value)),
},
}
case op.GetRequestDeleteRange() != nil:
deleteOp := op.GetRequestDeleteRange()
operation = model.EtcdOperation{
Type: model.DeleteOperation,
Delete: model.DeleteOptions{
Key: string(deleteOp.Key),
},
}
default:
panic(fmt.Sprintf("Unknown op type %v", op))
}
return operation
}
Loading

0 comments on commit 86dc8a6

Please sign in to comment.