From e770230d651364b34b3fd88b8c9a6e941bfe2158 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 15 Oct 2023 11:39:35 +0200 Subject: [PATCH] Utilize WAL to patch operation history Signed-off-by: Marek Siarkowicz --- tests/robustness/main_test.go | 26 +- tests/robustness/report/wal.go | 240 ++++++++++++++++++ tests/robustness/validate/patch_history.go | 132 +++++++++- .../robustness/validate/patch_history_test.go | 227 ++++++++++------- tests/robustness/validate/validate.go | 4 +- tests/robustness/validate/validate_test.go | 9 +- 6 files changed, 521 insertions(+), 117 deletions(-) create mode 100644 tests/robustness/report/wal.go diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index ccd853b7c241..fbdb021dbaa3 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -64,18 +64,18 @@ func TestRobustnessRegression(t *testing.T) { } func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) { - report := report.TestReport{Logger: lg} + r := report.TestReport{Logger: lg} var err error - report.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster)) + r.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster)) if err != nil { t.Fatal(err) } - defer report.Cluster.Close() + defer r.Cluster.Close() if s.failpoint == nil { - s.failpoint = failpoint.PickRandom(t, report.Cluster) + s.failpoint = failpoint.PickRandom(t, r.Cluster) } else { - err = failpoint.Validate(report.Cluster, s.failpoint) + err = failpoint.Validate(r.Cluster, s.failpoint) if err != nil { t.Fatal(err) } @@ -86,15 +86,19 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce // Refer to: https://github.com/golang/go/issues/49929 panicked := true defer func() { - report.Report(t, panicked) + r.Report(t, panicked) }() - report.Client = s.run(ctx, t, lg, report.Cluster) - forcestopCluster(report.Cluster) + r.Client = s.run(ctx, t, lg, r.Cluster) + persistedRequests, err := report.PersistedRequestsCluster(lg, r.Cluster) + if err != nil { + t.Error(err) + } + forcestopCluster(r.Cluster) - watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0 - validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled) + watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0 + validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled) validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()} - report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client, 5*time.Minute) + r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute) panicked = false } diff --git a/tests/robustness/report/wal.go b/tests/robustness/report/wal.go new file mode 100644 index 000000000000..e41a244a0d73 --- /dev/null +++ b/tests/robustness/report/wal.go @@ -0,0 +1,240 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package report + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/google/go-cmp/cmp" + "go.uber.org/zap" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/server/v3/storage/wal" + "go.etcd.io/etcd/server/v3/storage/wal/walpb" + "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/model" + "go.etcd.io/raft/v3/raftpb" +) + +func LoadClusterPersistedRequests(lg *zap.Logger, path string) ([]model.EtcdRequest, error) { + files, err := os.ReadDir(path) + if err != nil { + return nil, err + } + dataDirs := []string{} + for _, file := range files { + if file.IsDir() && strings.HasPrefix(file.Name(), "server-") { + dataDirs = append(dataDirs, filepath.Join(path, file.Name())) + } + } + return PersistedRequestsDirs(lg, dataDirs) +} + +func PersistedRequestsCluster(lg *zap.Logger, cluster *e2e.EtcdProcessCluster) ([]model.EtcdRequest, error) { + dataDirs := []string{} + for _, proc := range cluster.Procs { + dataDirs = append(dataDirs, proc.Config().DataDirPath) + } + return PersistedRequestsDirs(lg, dataDirs) +} + +func PersistedRequestsDirs(lg *zap.Logger, dataDirs []string) ([]model.EtcdRequest, error) { + persistedRequests := []model.EtcdRequest{} + allowedFailures := len(dataDirs) / 2 + for _, dir := range dataDirs { + memberRequests, err := requestsPersistedInWAL(lg, dir) + if err != nil { + if allowedFailures < 1 { + return nil, err + } + allowedFailures-- + continue + } + minLength := min(len(persistedRequests), len(memberRequests)) + if diff := cmp.Diff(memberRequests[:minLength], persistedRequests[:minLength]); diff != "" { + return nil, fmt.Errorf("unexpected differences between wal entries, diff:\n%s", diff) + } + if len(memberRequests) > len(persistedRequests) { + persistedRequests = memberRequests + } + } + return persistedRequests, nil +} + +func requestsPersistedInWAL(lg *zap.Logger, dataDir string) ([]model.EtcdRequest, error) { + state, ents, err := ReadWAL(lg, dataDir) + if err != nil { + return nil, err + } + requests := make([]model.EtcdRequest, 0, len(ents)) + for _, ent := range ents { + if ent.Type != raftpb.EntryNormal || len(ent.Data) == 0 { + continue + } + if ent.Index > state.Commit { + break + } + request, err := parseEntryNormal(ent) + if err != nil { + return nil, err + } + if request != nil { + requests = append(requests, *request) + } + } + return requests, nil +} + +func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raftpb.Entry, err error) { + walDir := filepath.Join(dataDir, "member", "wal") + repaired := false + for { + w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0}) + if err != nil { + lg.Fatal("failed to open WAL", zap.Error(err)) + } + _, state, ents, err = w.ReadAll() + w.Close() + if err != nil { + if errors.Is(err, wal.ErrSnapshotNotFound) { + return state, ents, nil + } + // we can only repair ErrUnexpectedEOF and we never repair twice. + if repaired || !errors.Is(err, io.ErrUnexpectedEOF) { + return state, nil, fmt.Errorf("failed to read WAL, cannot be repaired, err: %s", err) + } + if !wal.Repair(lg, walDir) { + return state, nil, fmt.Errorf("failed to repair WAL, err: %s", err) + } + lg.Info("repaired WAL", zap.Error(err)) + repaired = true + continue + } + return state, ents, nil + } +} + +func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) { + var raftReq pb.InternalRaftRequest + if err := raftReq.Unmarshal(ent.Data); err != nil { + var r pb.Request + isV2Entry := pbutil.MaybeUnmarshal(&r, ent.Data) + if !isV2Entry { + return nil, err + } + return nil, nil + } + switch { + case raftReq.Put != nil: + op := model.PutOptions{ + Key: string(raftReq.Put.Key), + Value: model.ToValueOrHash(string(raftReq.Put.Value)), + LeaseID: raftReq.Put.Lease, + } + request := model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + {Type: model.PutOperation, Put: op}, + }, + }, + } + return &request, nil + case raftReq.DeleteRange != nil: + op := model.DeleteOptions{Key: string(raftReq.DeleteRange.Key)} + request := model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + {Type: model.DeleteOperation, Delete: op}, + }, + }, + } + return &request, nil + case raftReq.LeaseGrant != nil: + return &model.EtcdRequest{ + Type: model.LeaseGrant, + LeaseGrant: &model.LeaseGrantRequest{LeaseID: raftReq.LeaseGrant.ID}, + }, nil + case raftReq.ClusterMemberAttrSet != nil: + return nil, nil + case raftReq.ClusterVersionSet != nil: + return nil, nil + case raftReq.Compaction != nil: + return nil, nil + case raftReq.Txn != nil: + txn := model.TxnRequest{} + for _, cmp := range raftReq.Txn.Compare { + txn.Conditions = append(txn.Conditions, model.EtcdCondition{ + Key: string(cmp.Key), + ExpectedRevision: cmp.GetModRevision(), + }) + } + for _, op := range raftReq.Txn.Success { + txn.OperationsOnSuccess = append(txn.OperationsOnSuccess, toEtcdOperation(op)) + } + for _, op := range raftReq.Txn.Failure { + txn.OperationsOnFailure = append(txn.OperationsOnFailure, toEtcdOperation(op)) + } + request := model.EtcdRequest{ + Type: model.Txn, + Txn: &txn, + } + return &request, nil + default: + panic(fmt.Sprintf("Unhandled raft request: %+v", raftReq)) + } +} + +func toEtcdOperation(op *pb.RequestOp) (operation model.EtcdOperation) { + switch { + case op.GetRequestRange() != nil: + rangeOp := op.GetRequestRange() + operation = model.EtcdOperation{ + Type: model.RangeOperation, + Range: model.RangeOptions{ + Start: string(rangeOp.Key), + End: string(rangeOp.RangeEnd), + Limit: rangeOp.Limit, + }, + } + case op.GetRequestPut() != nil: + putOp := op.GetRequestPut() + operation = model.EtcdOperation{ + Type: model.PutOperation, + Put: model.PutOptions{ + Key: string(putOp.Key), + Value: model.ToValueOrHash(string(putOp.Value)), + }, + } + case op.GetRequestDeleteRange() != nil: + deleteOp := op.GetRequestDeleteRange() + operation = model.EtcdOperation{ + Type: model.DeleteOperation, + Delete: model.DeleteOptions{ + Key: string(deleteOp.Key), + }, + } + default: + panic(fmt.Sprintf("Unknown op type %v", op)) + } + return operation +} diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 17e282c8b0a1..65093a907a3d 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -15,6 +15,8 @@ package validate import ( + "fmt" + "github.com/anishathalye/porcupine" "go.etcd.io/etcd/tests/v3/robustness/model" @@ -22,10 +24,11 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/traffic" ) -func patchedOperationHistory(reports []report.ClientReport) []porcupine.Operation { +func patchedOperationHistory(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation { allOperations := operations(reports) uniqueEvents := uniqueWatchEvents(reports) - return patchOperationsWithWatchEvents(allOperations, uniqueEvents) + operationsReturnTime := persistedOperationsReturnTime(allOperations, persistedRequests) + return patchOperations(allOperations, uniqueEvents, operationsReturnTime) } func operations(reports []report.ClientReport) []porcupine.Operation { @@ -54,28 +57,36 @@ func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]traffic.Ti return persisted } -func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent) []porcupine.Operation { - +func patchOperations(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents) for _, op := range operations { request := op.Input.(model.EtcdRequest) resp := op.Output.(model.MaybeEtcdResponse) - if resp.Error == "" || op.Call > lastObservedOperation.Call || request.Type != model.Txn { + if resp.Error == "" || request.Type != model.Txn { // Cannot patch those requests. newOperations = append(newOperations, op) continue } - event := matchWatchEvent(request.Txn, watchEvents) - if event != nil { + canMatchEvents := op.Call <= lastObservedOperation.Call + matchingEvent := matchWatchEvent(request.Txn, watchEvents) + if canMatchEvents && matchingEvent != nil { + eventTime := matchingEvent.Time.Nanoseconds() // Set revision and time based on watchEvent. - op.Return = event.Time.Nanoseconds() - op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: event.Revision}} - newOperations = append(newOperations, op) - continue + if eventTime < op.Return { + op.Return = eventTime + } + op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: matchingEvent.Revision}} } - if !canBeDiscarded(request.Txn) { + persistedReturnTime := matchReturnTime(request, persistedOperations) + if persistedReturnTime != nil { + // Set return time based on persisted return time. + if *persistedReturnTime < op.Return { + op.Return = *persistedReturnTime + } + } + if persistedReturnTime != nil || (len(persistedOperations) == 0 && canMatchEvents && matchingEvent != nil) || !canBeDiscarded(request.Txn) { // Leave operation as it is as we cannot discard it. newOperations = append(newOperations, op) continue @@ -143,3 +154,100 @@ func hasUniqueWriteOperation(ops []model.EtcdOperation) bool { } return false } + +func persistedOperationsReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[model.EtcdOperation]int64 { + operationReturnTime := operationReturnTime(allOperations) + persisted := map[model.EtcdOperation]int64{} + + lastReturnTime := maxReturnTime(operationReturnTime) + + for i := len(persistedRequests) - 1; i >= 0; i-- { + request := persistedRequests[i] + switch request.Type { + case model.Txn: + hasPut := false + lastReturnTime-- + for _, op := range request.Txn.OperationsOnSuccess { + if op.Type != model.PutOperation { + continue + } + if _, found := persisted[op]; found { + panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op)) + } + hasPut = true + persisted[op] = lastReturnTime + } + if hasPut { + newReturnTime := requestReturnTime(operationReturnTime, request) + lastReturnTime = min(lastReturnTime, newReturnTime) + } + case model.LeaseGrant: + default: + panic(fmt.Sprintf("Unknown request type: %q", request.Type)) + } + } + return persisted +} + +func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 { + newOperations := map[model.EtcdOperation]int64{} + for _, op := range operations { + request := op.Input.(model.EtcdRequest) + switch request.Type { + case model.Txn: + for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + if etcdOp.Type != model.PutOperation { + continue + } + if _, found := newOperations[etcdOp]; found { + panic("Unexpected duplicate event in persisted requests.") + } + newOperations[etcdOp] = op.Return + } + case model.Range: + case model.LeaseGrant: + default: + panic(fmt.Sprintf("Unknown request type: %q", request.Type)) + } + } + return newOperations +} + +func maxReturnTime(operationTime map[model.EtcdOperation]int64) int64 { + var maxReturnTime int64 + for _, returnTime := range operationTime { + if returnTime > maxReturnTime { + maxReturnTime = returnTime + } + } + return maxReturnTime +} + +func requestReturnTime(operationTime map[model.EtcdOperation]int64, request model.EtcdRequest) int64 { + switch request.Type { + case model.Txn: + for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + if op.Type != model.PutOperation { + continue + } + if time, found := operationTime[op]; found { + return time + } + } + panic(fmt.Sprintf("Unknown return time for: %+v", request.Txn)) + default: + panic(fmt.Sprintf("Unknown request type: %q", request.Type)) + } +} + +func matchReturnTime(request model.EtcdRequest, persistedOperations map[model.EtcdOperation]int64) *int64 { + for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + if etcdOp.Type != model.PutOperation { + continue + } + if returnTime, found := persistedOperations[etcdOp]; found { + return &returnTime + } + } + return nil +} diff --git a/tests/robustness/validate/patch_history_test.go b/tests/robustness/validate/patch_history_test.go index 9e3877033c01..2d2467c806b7 100644 --- a/tests/robustness/validate/patch_history_test.go +++ b/tests/robustness/validate/patch_history_test.go @@ -16,22 +16,21 @@ package validate import ( "errors" + "go.etcd.io/etcd/tests/v3/robustness/report" "testing" "time" - "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/model" - "go.etcd.io/etcd/tests/v3/robustness/report" ) func TestPatchHistory(t *testing.T) { for _, tc := range []struct { - name string - historyFunc func(baseTime time.Time, h *model.AppendableHistory) - event model.Event - expectRemains bool + name string + historyFunc func(baseTime time.Time, h *model.AppendableHistory) + persistedRequest *model.EtcdRequest + expectedRemainingOperations int }{ { name: "successful range remains", @@ -41,7 +40,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendRange("key", "", 0, 0, start, stop, &clientv3.GetResponse{}) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "successful put remains", @@ -51,7 +50,21 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPut("key", "value", start, stop, &clientv3.PutResponse{}, nil) }, - expectRemains: true, + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, + }, + expectedRemainingOperations: 1, }, { name: "failed put remains if there is a matching event", @@ -61,12 +74,21 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPut("key", "value", start, stop, nil, errors.New("failed")) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed put is dropped if event has different key", @@ -75,13 +97,26 @@ func TestPatchHistory(t *testing.T) { time.Sleep(time.Nanosecond) stop := time.Since(baseTime) h.AppendPut("key1", "value", start, stop, nil, errors.New("failed")) + start2 := time.Since(baseTime) + time.Sleep(time.Nanosecond) + stop2 := time.Since(baseTime) + h.AppendPut("key2", "value", start2, stop2, &clientv3.PutResponse{}, nil) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key2", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key2", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, }, - expectRemains: false, + expectedRemainingOperations: 1, }, { name: "failed put is dropped if event has different value", @@ -90,13 +125,26 @@ func TestPatchHistory(t *testing.T) { time.Sleep(time.Nanosecond) stop := time.Since(baseTime) h.AppendPut("key", "value1", start, stop, nil, errors.New("failed")) + start2 := time.Since(baseTime) + time.Sleep(time.Nanosecond) + stop2 := time.Since(baseTime) + h.AppendPut("key", "value2", start2, stop2, &clientv3.PutResponse{}, nil) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value2"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value2"), + }, + }, + }, + }, }, - expectRemains: false, + expectedRemainingOperations: 1, }, { name: "failed put with lease remains if there is a matching event", @@ -106,12 +154,22 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPutWithLease("key", "value", 123, start, stop, nil, errors.New("failed")) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + LeaseID: 123, + }, + }, + }, + }, }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed put is dropped", @@ -121,7 +179,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPut("key", "value", start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed put with lease is dropped", @@ -131,7 +189,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendPutWithLease("key", "value", 123, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "successful delete remains", @@ -141,7 +199,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendDelete("key", start, stop, &clientv3.DeleteResponse{}, nil) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed delete remains", @@ -151,7 +209,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendDelete("key", start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "successful empty txn remains", @@ -161,7 +219,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{}, start, stop, &clientv3.TxnResponse{}, nil) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed empty txn is dropped", @@ -171,7 +229,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn put is dropped", @@ -181,7 +239,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn put remains if there is a matching event", @@ -191,12 +249,21 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn delete remains", @@ -206,7 +273,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpDelete("key")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "successful txn put/delete remains", @@ -216,7 +283,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{clientv3.OpDelete("key")}, start, stop, &clientv3.TxnResponse{}, nil) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn put/delete remains", @@ -226,7 +293,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn delete/put remains", @@ -236,7 +303,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpDelete("key")}, []clientv3.Op{clientv3.OpPut("key", "value")}, start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn empty/put is dropped", @@ -246,7 +313,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{clientv3.OpPut("key", "value")}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn empty/put remains if there is a matching event", @@ -256,12 +323,21 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - event: model.Event{ - Type: model.PutOperation, - Key: "key", - Value: model.ToValueOrHash("value"), + persistedRequest: &model.EtcdRequest{ + Type: model.Txn, + Txn: &model.TxnRequest{ + OperationsOnSuccess: []model.EtcdOperation{ + { + Type: model.PutOperation, + Put: model.PutOptions{ + Key: "key", + Value: model.ToValueOrHash("value"), + }, + }, + }, + }, }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn empty/delete remains", @@ -271,7 +347,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed")) }, - expectRemains: true, + expectedRemainingOperations: 1, }, { name: "failed txn put&delete is dropped", @@ -281,7 +357,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value1"), clientv3.OpDelete("key")}, []clientv3.Op{}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn empty/put&delete is dropped", @@ -291,7 +367,7 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{clientv3.OpPut("key", "value1"), clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, { name: "failed txn put&delete/put&delete is dropped", @@ -301,55 +377,26 @@ func TestPatchHistory(t *testing.T) { stop := time.Since(baseTime) h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value1"), clientv3.OpDelete("key")}, []clientv3.Op{clientv3.OpPut("key", "value2"), clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed")) }, - expectRemains: false, + expectedRemainingOperations: 0, }, } { t.Run(tc.name, func(t *testing.T) { baseTime := time.Now() history := model.NewAppendableHistory(identity.NewIDProvider()) tc.historyFunc(baseTime, history) - time.Sleep(time.Nanosecond) - start := time.Since(baseTime) - time.Sleep(time.Nanosecond) - stop := time.Since(baseTime) - history.AppendPut("tombstone", "true", start, stop, &clientv3.PutResponse{Header: &etcdserverpb.ResponseHeader{Revision: 3}}, nil) - watch := []model.WatchResponse{ - { - Events: []model.WatchEvent{ - { - PersistedEvent: model.PersistedEvent{ - Event: tc.event, Revision: 2, - }, - }, - }, - Revision: 2, - Time: time.Since(baseTime), - }, - { - Events: []model.WatchEvent{ - { - PersistedEvent: model.PersistedEvent{ - Event: model.Event{ - Type: model.PutOperation, - Key: "tombstone", - Value: model.ToValueOrHash("true"), - }, Revision: 3}, - }, - }, - Revision: 3, - Time: time.Since(baseTime), - }, + requests := []model.EtcdRequest{} + if tc.persistedRequest != nil { + requests = append(requests, *tc.persistedRequest) } operations := patchedOperationHistory([]report.ClientReport{ { ClientID: 0, KeyValue: history.History.Operations(), - Watch: []model.WatchOperation{{Responses: watch}}, + Watch: []model.WatchOperation{}, }, - }) - remains := len(operations) == history.Len() - if remains != tc.expectRemains { - t.Errorf("Unexpected remains, got: %v, want: %v", remains, tc.expectRemains) + }, requests) + if len(operations) != tc.expectedRemainingOperations { + t.Errorf("Unexpected remains, got: %d, want: %d", len(operations), tc.expectedRemainingOperations) } }) } diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index 3e27ba49164d..f47d505ab978 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -29,8 +29,8 @@ import ( ) // ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private. -func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, timeout time.Duration) (visualize func(basepath string) error) { - patchedOperations := patchedOperationHistory(reports) +func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, persistedRequests []model.EtcdRequest, timeout time.Duration) (visualize func(basepath string) error) { + patchedOperations := patchedOperationHistory(reports, persistedRequests) linearizable, visualize := validateLinearizableOperationsAndVisualize(lg, patchedOperations, timeout) if linearizable != porcupine.Ok { t.Error("Failed linearization, skipping further validation") diff --git a/tests/robustness/validate/validate_test.go b/tests/robustness/validate/validate_test.go index 8bb3bd7231f8..026502f26c26 100644 --- a/tests/robustness/validate/validate_test.go +++ b/tests/robustness/validate/validate_test.go @@ -32,13 +32,18 @@ func TestValidate(t *testing.T) { testdataPath := testutils.MustAbsPath("../testdata/") files, err := os.ReadDir(testdataPath) assert.NoError(t, err) - assert.GreaterOrEqual(t, len(files), 1) for _, file := range files { t.Run(file.Name(), func(t *testing.T) { + lg := zaptest.NewLogger(t) path := filepath.Join(testdataPath, file.Name()) reports, err := report.LoadClientReports(path) assert.NoError(t, err) - visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, 5*time.Minute) + + persistedRequests, err := report.LoadClusterPersistedRequests(lg, path) + if err != nil { + t.Error(err) + } + visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, persistedRequests, 5*time.Minute) if t.Failed() { err := visualize(filepath.Join(path, "history.html"))