Skip to content

Commit

Permalink
Draft reproduce issue etcd-io#17529
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Apr 7, 2024
1 parent a7f0ed2 commit b8adab8
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 63 deletions.
10 changes: 5 additions & 5 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,11 @@ func (s *watchableStore) syncWatchers() int {
victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
continue
}
//if w.minRev < compactionRev {
// // Skip the watcher that failed to send compacted watch response due to w.ch is full.
// // Next retry of syncWatchers would try to resend the compacted watch response to w.ch
// continue
//}
w.minRev = curRev + 1

eb, ok := wb[w]
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (f *BinaryFailpoints) DeactivateHTTP(ctx context.Context, failpoint string)
}
httpClient := http.Client{
// TODO: Decrease after deactivate is not blocked by sleep https://github.com/etcd-io/gofail/issues/64
Timeout: 2 * time.Second,
Timeout: 3 * time.Second,
}
if f.clientTimeout != 0 {
httpClient.Timeout = f.clientTimeout
Expand Down
29 changes: 15 additions & 14 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,21 @@ const (

var (
allFailpoints = []Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
BeforeApplyOneConfChangeSleep,
MemberReplace,
DropPeerNetwork,
RaftBeforeSaveSleep,
RaftAfterSaveSleep,
ApplyBeforeOpenSnapshot,
//KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
//DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
//BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
//BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
//CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
//CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
//RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
//RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
//BeforeApplyOneConfChangeSleep,
//MemberReplace,
//DropPeerNetwork,
//RaftBeforeSaveSleep,
//RaftAfterSaveSleep,
//ApplyBeforeOpenSnapshot,
beforeSendWatchResponse,
}
)

Expand Down
4 changes: 3 additions & 1 deletion tests/robustness/failpoint/gofail.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
BeforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second}
RaftBeforeSaveSleep Failpoint = gofailSleepAndDeactivate{"raftBeforeSave", time.Second}
RaftAfterSaveSleep Failpoint = gofailSleepAndDeactivate{"raftAfterSave", time.Second}
beforeSendWatchResponse Failpoint = gofailSleepAndDeactivate{"beforeSendWatchResponse", time.Second}
)

type goPanicFailpoint struct {
Expand Down Expand Up @@ -209,13 +210,14 @@ func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *
lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err)
}
time.Sleep(f.time)
time.Sleep(2 * time.Second)
lg.Info("Deactivating gofailpoint", zap.String("failpoint", f.Name()))
err = member.Failpoints().DeactivateHTTP(ctx, f.failpoint)
if err != nil {
lg.Info("goFailpoint deactivate failed", zap.String("failpoint", f.Name()), zap.Error(err))
return fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err)
}
time.Sleep(2 * time.Second)
return nil
}

Expand Down
21 changes: 10 additions & 11 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
}
forcestopCluster(r.Cluster)

watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
//watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
//validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)

Expand Down Expand Up @@ -125,17 +125,16 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
lg.Info("Finished injecting failures")
return nil
})
maxRevisionChan := make(chan int64, 1)
//maxRevisionChan := make(chan int64, 1)
//g.Go(func() error {
// watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
// return nil
//})
g.Go(func() error {
defer close(maxRevisionChan)
//defer close(maxRevisionChan)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids)
maxRevision := operationsMaxRevision(operationReport)
maxRevisionChan <- maxRevision
lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision))
return nil
})
g.Go(func() error {
watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
//maxRevisionChan <- operationsMaxRevision(operationReport)
time.Sleep(time.Second)
return nil
})
g.Wait()
Expand Down
4 changes: 3 additions & 1 deletion tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision)
case Txn:
return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision)
case LeaseGrant, LeaseRevoke, Defragment:
case LeaseGrant, LeaseRevoke, Defragment, Compact:
if response.Revision == 0 {
return "ok"
}
Expand Down Expand Up @@ -63,6 +63,8 @@ func describeEtcdRequest(request EtcdRequest) string {
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
case Defragment:
return fmt.Sprintf("defragment()")
case Compact:
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down
22 changes: 18 additions & 4 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ var DeterministicModel = porcupine.Model{
}

type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
Expand Down Expand Up @@ -178,6 +179,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}}
case Compact:
s.CompactRevision = request.Compact.Revision
return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
Expand Down Expand Up @@ -234,6 +238,7 @@ type RequestType string
const (
Range RequestType = "range"
Txn RequestType = "txn"
Compact RequestType = "compact"
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
Expand All @@ -246,6 +251,7 @@ type EtcdRequest struct {
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}

type RangeRequest struct {
Expand All @@ -269,6 +275,13 @@ type DeleteOptions struct {
Key string
}

type CompactResponse struct {
}

type CompactRequest struct {
Revision int64
}

type TxnRequest struct {
Conditions []EtcdCondition
OperationsOnSuccess []EtcdOperation
Expand Down Expand Up @@ -322,6 +335,7 @@ type EtcdResponse struct {
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
Revision int64
}

Expand Down
27 changes: 27 additions & 0 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
})
}

func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
request := compactRequest(rev)
if err != nil {
h.appendFailed(request, start.Nanoseconds(), err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: compactResponse(revision),
Return: end.Nanoseconds(),
})
}

func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
conds := []EtcdCondition{}
for _, cmp := range cmp {
Expand Down Expand Up @@ -405,6 +424,10 @@ func putResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{}}}, Revision: revision}}
}

func compactRequest(rev int64) EtcdRequest {
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
}

func deleteRequest(key string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Delete: DeleteOptions{Key: key}}}}}
}
Expand All @@ -413,6 +436,10 @@ func deleteResponse(deleted int64, revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}}
}

func compactResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
}

func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest {
return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil)
}
Expand Down
24 changes: 12 additions & 12 deletions tests/robustness/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ type TrafficProfile struct {
}

var trafficProfiles = []TrafficProfile{
{
Traffic: traffic.EtcdPut,
Profile: traffic.HighTrafficProfile,
},
{
Traffic: traffic.EtcdPutDeleteLease,
Profile: traffic.LowTraffic,
},
{
Traffic: traffic.Kubernetes,
Profile: traffic.HighTrafficProfile,
},
//{
// Traffic: traffic.EtcdPut,
// Profile: traffic.HighTrafficProfile,
//},
//{
// Traffic: traffic.EtcdPutDeleteLease,
// Profile: traffic.LowTraffic,
//},
//{
// Traffic: traffic.Kubernetes,
// Profile: traffic.HighTrafficProfile,
//},
{
Traffic: traffic.Kubernetes,
Profile: traffic.LowTraffic,
Expand Down
14 changes: 14 additions & 0 deletions tests/robustness/traffic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package traffic
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -135,6 +136,19 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.Del
return resp, err
}

func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
fmt.Printf("Compact %d\n", rev)
resp, err := c.client.Compact(ctx, rev)
returnTime := time.Since(c.baseTime)
if err == nil || !strings.Contains(err.Error(), "mvcc: required revision has been compacted") {
c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err)
}
return resp, err
}

func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) {
txn := c.client.Txn(ctx).If(
conditions...,
Expand Down
25 changes: 17 additions & 8 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ var (
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 90},
{choice: KubernetesUpdate, weight: 89},
{choice: KubernetesDelete, weight: 5},
{choice: KubernetesCreate, weight: 5},
{choice: KubernetesCompact, weight: 1},
},
}
)
Expand All @@ -63,6 +64,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
s := newStorage()
keyPrefix := "/registry/" + t.resource + "/"
g := errgroup.Group{}
limit := int64(t.averageKeyCount)

g.Go(func() error {
for {
Expand All @@ -73,7 +75,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
return nil
default:
}
rev, err := t.Read(ctx, kc, s, limiter, keyPrefix)
rev, err := t.Read(ctx, kc, s, limiter, keyPrefix, limit)
if err != nil {
continue
}
Expand All @@ -92,7 +94,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
}
// Avoid multiple failed writes in a row
if lastWriteFailed {
_, err := t.Read(ctx, kc, s, limiter, keyPrefix)
_, err := t.Read(ctx, kc, s, limiter, keyPrefix, 0)
if err != nil {
continue
}
Expand All @@ -107,8 +109,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
g.Wait()
}

func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string) (rev int64, err error) {
limit := int64(t.averageKeyCount)
func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, limit int64) (rev int64, err error) {
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)

hasMore := true
Expand Down Expand Up @@ -166,6 +167,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
case KubernetesCompact:
err = kc.Compact(writeCtx, rev)
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
Expand Down Expand Up @@ -208,9 +211,10 @@ func (t kubernetesTraffic) generateKey() string {
type KubernetesRequestType string

const (
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesCompact KubernetesRequestType = "compact"
)

type kubernetesClient struct {
Expand Down Expand Up @@ -249,6 +253,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error {
return k.client.RequestProgress(clientv3.WithRequireLeader(ctx))
}

func (k kubernetesClient) Compact(ctx context.Context, rev int64) error {
_, err := k.client.Compact(ctx, rev)
return err
}

// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing.
// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure.
func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) {
Expand Down
Loading

0 comments on commit b8adab8

Please sign in to comment.