diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index ad17b2be7ace..f937c5dcdcf9 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -372,11 +372,11 @@ func (s *watchableStore) syncWatchers() int { victims := make(watcherBatch) wb := newWatcherBatch(wg, evs) for w := range wg.watchers { - if w.minRev < compactionRev { - // Skip the watcher that failed to send compacted watch response due to w.ch is full. - // Next retry of syncWatchers would try to resend the compacted watch response to w.ch - continue - } + //if w.minRev < compactionRev { + // // Skip the watcher that failed to send compacted watch response due to w.ch is full. + // // Next retry of syncWatchers would try to resend the compacted watch response to w.ch + // continue + //} w.minRev = curRev + 1 eb, ok := wb[w] diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index af5c437c7c75..0c30408c5d1b 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -393,7 +393,7 @@ func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string) } httpClient := http.Client{ // TODO: Decrease after deactivate is not blocked by sleep https://github.com/etcd-io/gofail/issues/64 - Timeout: 2 * time.Second, + Timeout: 3 * time.Second, } if f.clientTimeout != 0 { httpClient.Timeout = f.clientTimeout diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index 12a72b69d96a..3b6da8b503b2 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -37,20 +37,21 @@ const ( var ( allFailpoints = []Failpoint{ - KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic, - DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic, - BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic, - BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, - CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, - CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork, - RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, - RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot, - BeforeApplyOneConfChangeSleep, - MemberReplace, - DropPeerNetwork, - RaftBeforeSaveSleep, - RaftAfterSaveSleep, - ApplyBeforeOpenSnapshot, + //KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic, + //DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic, + //BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic, + //BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, + //CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, + //CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork, + //RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, + //RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot, + //BeforeApplyOneConfChangeSleep, + //MemberReplace, + //DropPeerNetwork, + //RaftBeforeSaveSleep, + //RaftAfterSaveSleep, + //ApplyBeforeOpenSnapshot, + beforeSendWatchResponse, } ) diff --git a/tests/robustness/failpoint/gofail.go b/tests/robustness/failpoint/gofail.go index 3d90c5ddd8f8..c83432615077 100644 --- a/tests/robustness/failpoint/gofail.go +++ b/tests/robustness/failpoint/gofail.go @@ -57,6 +57,7 @@ var ( BeforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second} RaftBeforeSaveSleep Failpoint = gofailSleepAndDeactivate{"raftBeforeSave", time.Second} RaftAfterSaveSleep Failpoint = gofailSleepAndDeactivate{"raftAfterSave", time.Second} + beforeSendWatchResponse Failpoint = gofailSleepAndDeactivate{"beforeSendWatchResponse", time.Second} ) type goPanicFailpoint struct { @@ -209,13 +210,14 @@ func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg * lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err)) return fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err) } - time.Sleep(f.time) + time.Sleep(2 * time.Second) lg.Info("Deactivating gofailpoint", zap.String("failpoint", f.Name())) err = member.Failpoints().DeactivateHTTP(ctx, f.failpoint) if err != nil { lg.Info("goFailpoint deactivate failed", zap.String("failpoint", f.Name()), zap.Error(err)) return fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err) } + time.Sleep(2 * time.Second) return nil } diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index fbdb021dbaa3..9311b7806629 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -95,8 +95,8 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce } forcestopCluster(r.Cluster) - watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0 - validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled) + //watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0 + //validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled) validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()} r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute) @@ -125,17 +125,16 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu lg.Info("Finished injecting failures") return nil }) - maxRevisionChan := make(chan int64, 1) + //maxRevisionChan := make(chan int64, 1) + //g.Go(func() error { + // watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids) + // return nil + //}) g.Go(func() error { - defer close(maxRevisionChan) + //defer close(maxRevisionChan) operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids) - maxRevision := operationsMaxRevision(operationReport) - maxRevisionChan <- maxRevision - lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision)) - return nil - }) - g.Go(func() error { - watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids) + //maxRevisionChan <- operationsMaxRevision(operationReport) + time.Sleep(time.Second) return nil }) g.Wait() diff --git a/tests/robustness/model/describe.go b/tests/robustness/model/describe.go index 9b92d3bcfc42..ada1fb10bc7f 100644 --- a/tests/robustness/model/describe.go +++ b/tests/robustness/model/describe.go @@ -33,7 +33,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision) case Txn: return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision) - case LeaseGrant, LeaseRevoke, Defragment: + case LeaseGrant, LeaseRevoke, Defragment, Compact: if response.Revision == 0 { return "ok" } @@ -63,6 +63,8 @@ func describeEtcdRequest(request EtcdRequest) string { return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID) case Defragment: return fmt.Sprintf("defragment()") + case Compact: + return fmt.Sprintf("compact(%d)", request.Compact.Revision) default: return fmt.Sprintf("", request.Type) } diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index 9b743c9a8f64..b554e5ada0cb 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -64,10 +64,11 @@ var DeterministicModel = porcupine.Model{ } type EtcdState struct { - Revision int64 - KeyValues map[string]ValueRevision - KeyLeases map[string]int64 - Leases map[int64]EtcdLease + Revision int64 + CompactRevision int64 + KeyValues map[string]ValueRevision + KeyLeases map[string]int64 + Leases map[int64]EtcdLease } func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) { @@ -178,6 +179,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}} case Defragment: return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}} + case Compact: + s.CompactRevision = request.Compact.Revision + return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}} default: panic(fmt.Sprintf("Unknown request type: %v", request.Type)) } @@ -234,6 +238,7 @@ type RequestType string const ( Range RequestType = "range" Txn RequestType = "txn" + Compact RequestType = "compact" LeaseGrant RequestType = "leaseGrant" LeaseRevoke RequestType = "leaseRevoke" Defragment RequestType = "defragment" @@ -246,6 +251,7 @@ type EtcdRequest struct { Range *RangeRequest Txn *TxnRequest Defragment *DefragmentRequest + Compact *CompactRequest } type RangeRequest struct { @@ -269,6 +275,13 @@ type DeleteOptions struct { Key string } +type CompactResponse struct { +} + +type CompactRequest struct { + Revision int64 +} + type TxnRequest struct { Conditions []EtcdCondition OperationsOnSuccess []EtcdOperation @@ -322,6 +335,7 @@ type EtcdResponse struct { LeaseGrant *LeaseGrantReponse LeaseRevoke *LeaseRevokeResponse Defragment *DefragmentResponse + Compact *CompactResponse Revision int64 } diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index 936597c0e17f..0e73c376eb64 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -169,6 +169,25 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r }) } +func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) { + request := compactRequest(rev) + if err != nil { + h.appendFailed(request, start.Nanoseconds(), err) + return + } + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } + h.appendSuccessful(porcupine.Operation{ + ClientId: h.streamID, + Input: request, + Call: start.Nanoseconds(), + Output: compactResponse(revision), + Return: end.Nanoseconds(), + }) +} + func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) { conds := []EtcdCondition{} for _, cmp := range cmp { @@ -405,6 +424,10 @@ func putResponse(revision int64) MaybeEtcdResponse { return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{}}}, Revision: revision}} } +func compactRequest(rev int64) EtcdRequest { + return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}} +} + func deleteRequest(key string) EtcdRequest { return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Delete: DeleteOptions{Key: key}}}}} } @@ -413,6 +436,10 @@ func deleteResponse(deleted int64, revision int64) MaybeEtcdResponse { return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}} } +func compactResponse(revision int64) MaybeEtcdResponse { + return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}} +} + func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest { return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil) } diff --git a/tests/robustness/scenarios.go b/tests/robustness/scenarios.go index 01cd00c8b2b6..a034a370da31 100644 --- a/tests/robustness/scenarios.go +++ b/tests/robustness/scenarios.go @@ -32,18 +32,18 @@ type TrafficProfile struct { } var trafficProfiles = []TrafficProfile{ - { - Traffic: traffic.EtcdPut, - Profile: traffic.HighTrafficProfile, - }, - { - Traffic: traffic.EtcdPutDeleteLease, - Profile: traffic.LowTraffic, - }, - { - Traffic: traffic.Kubernetes, - Profile: traffic.HighTrafficProfile, - }, + //{ + // Traffic: traffic.EtcdPut, + // Profile: traffic.HighTrafficProfile, + //}, + //{ + // Traffic: traffic.EtcdPutDeleteLease, + // Profile: traffic.LowTraffic, + //}, + //{ + // Traffic: traffic.Kubernetes, + // Profile: traffic.HighTrafficProfile, + //}, { Traffic: traffic.Kubernetes, Profile: traffic.LowTraffic, diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 2cfab6a368ab..52e58f164c91 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -17,6 +17,7 @@ package traffic import ( "context" "fmt" + "strings" "sync" "time" @@ -135,6 +136,19 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.Del return resp, err } +func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + callTime := time.Since(c.baseTime) + fmt.Printf("Compact %d\n", rev) + resp, err := c.client.Compact(ctx, rev) + returnTime := time.Since(c.baseTime) + if err == nil || !strings.Contains(err.Error(), "mvcc: required revision has been compacted") { + c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err) + } + return resp, err +} + func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) { txn := c.client.Txn(ctx).If( conditions..., diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index ff01650035d7..aef47eef71c8 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -36,9 +36,10 @@ var ( resource: "pods", namespace: "default", writeChoices: []choiceWeight[KubernetesRequestType]{ - {choice: KubernetesUpdate, weight: 90}, + {choice: KubernetesUpdate, weight: 89}, {choice: KubernetesDelete, weight: 5}, {choice: KubernetesCreate, weight: 5}, + {choice: KubernetesCompact, weight: 1}, }, } ) @@ -63,6 +64,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter s := newStorage() keyPrefix := "/registry/" + t.resource + "/" g := errgroup.Group{} + limit := int64(t.averageKeyCount) g.Go(func() error { for { @@ -73,7 +75,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter return nil default: } - rev, err := t.Read(ctx, kc, s, limiter, keyPrefix) + rev, err := t.Read(ctx, kc, s, limiter, keyPrefix, limit) if err != nil { continue } @@ -92,7 +94,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter } // Avoid multiple failed writes in a row if lastWriteFailed { - _, err := t.Read(ctx, kc, s, limiter, keyPrefix) + _, err := t.Read(ctx, kc, s, limiter, keyPrefix, 0) if err != nil { continue } @@ -107,8 +109,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter g.Wait() } -func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string) (rev int64, err error) { - limit := int64(t.averageKeyCount) +func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, limit int64) (rev int64, err error) { rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) hasMore := true @@ -166,6 +167,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev) case KubernetesCreate: err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID())) + case KubernetesCompact: + err = kc.Compact(writeCtx, rev) default: panic(fmt.Sprintf("invalid choice: %q", op)) } @@ -208,9 +211,10 @@ func (t kubernetesTraffic) generateKey() string { type KubernetesRequestType string const ( - KubernetesDelete KubernetesRequestType = "delete" - KubernetesUpdate KubernetesRequestType = "update" - KubernetesCreate KubernetesRequestType = "create" + KubernetesDelete KubernetesRequestType = "delete" + KubernetesUpdate KubernetesRequestType = "update" + KubernetesCreate KubernetesRequestType = "create" + KubernetesCompact KubernetesRequestType = "compact" ) type kubernetesClient struct { @@ -249,6 +253,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error { return k.client.RequestProgress(clientv3.WithRequireLeader(ctx)) } +func (k kubernetesClient) Compact(ctx context.Context, rev int64) error { + _, err := k.client.Compact(ctx, rev) + return err +} + // Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing. // However, if the keys value changed it wants imminently to read it, thus the Get operation on failure. func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) { diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 7e4d8d69f71e..f6f7f71babaf 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -31,7 +31,7 @@ import ( var ( DefaultLeaseTTL int64 = 7200 RequestTimeout = 40 * time.Millisecond - WatchTimeout = 400 * time.Millisecond + WatchTimeout = 10 * time.Second MultiOpTxnOpCount = 4 LowTraffic = Profile{ diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 65093a907a3d..31744bd469b9 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -179,9 +179,12 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste } if hasPut { newReturnTime := requestReturnTime(operationReturnTime, request) - lastReturnTime = min(lastReturnTime, newReturnTime) + if newReturnTime != -1 { + lastReturnTime = min(lastReturnTime, newReturnTime) + } } case model.LeaseGrant: + case model.Compact: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } @@ -206,6 +209,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati } case model.Range: case model.LeaseGrant: + case model.Compact: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } @@ -234,7 +238,7 @@ func requestReturnTime(operationTime map[model.EtcdOperation]int64, request mode return time } } - panic(fmt.Sprintf("Unknown return time for: %+v", request.Txn)) + return -1 default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index f47d505ab978..24328339878a 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -44,7 +44,7 @@ func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, report return visualize } validateWatch(t, lg, cfg, reports, eventHistory) - validateSerializableOperations(t, lg, patchedOperations, eventHistory) + //validateSerializableOperations(t, lg, patchedOperations, eventHistory) return visualize } diff --git a/tests/robustness/validate/watch.go b/tests/robustness/validate/watch.go index eeb00c6cb887..471b6230c0ec 100644 --- a/tests/robustness/validate/watch.go +++ b/tests/robustness/validate/watch.go @@ -34,8 +34,8 @@ func validateWatch(t *testing.T, lg *zap.Logger, cfg Config, reports []report.Cl if eventHistory != nil { validateReliable(t, eventHistory, r) validateResumable(t, eventHistory, r) - validatePrevKV(t, r, eventHistory) - validateCreateEvent(t, r, eventHistory) + //validatePrevKV(t, r, eventHistory) + //validateCreateEvent(t, r, eventHistory) } } }