Skip to content

Commit

Permalink
Merge pull request #13000 from ptabor/20210519-shouldApplyV3
Browse files Browse the repository at this point in the history
applyV2 should apply on backend only once
  • Loading branch information
ptabor authored May 19, 2021
2 parents 46b49a6 + 0a972a3 commit 788bc53
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 11 deletions.
12 changes: 6 additions & 6 deletions server/etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const v2Version = "v2"
type ApplierV2 interface {
Delete(r *RequestV2) Response
Post(r *RequestV2) Response
Put(r *RequestV2) Response
Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
QGet(r *RequestV2) Response
Sync(r *RequestV2) Response
}
Expand Down Expand Up @@ -67,7 +67,7 @@ func (a *applierV2store) Post(r *RequestV2) Response {
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
}

func (a *applierV2store) Put(r *RequestV2) Response {
func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
ttlOptions := r.TTLOptions()
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
Expand All @@ -89,7 +89,7 @@ func (a *applierV2store) Put(r *RequestV2) Response {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr, true)
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
}
// return an empty response since there is no consumer.
return Response{}
Expand All @@ -98,7 +98,7 @@ func (a *applierV2store) Put(r *RequestV2) Response {
if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, membership.ApplyBoth)
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
}
return Response{}
}
Expand All @@ -117,7 +117,7 @@ func (a *applierV2store) Sync(r *RequestV2) Response {

// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) {
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
stringer := panicAlternativeStringer{
stringer: r,
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
Expand All @@ -132,7 +132,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) {
case "POST":
return s.applyV2.Post(r)
case "PUT":
return s.applyV2.Put(r)
return s.applyV2.Put(r, shouldApplyV3)
case "DELETE":
return s.applyV2.Delete(r)
case "QGET":
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,14 +2181,14 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
rp := &r
pbutil.MustUnmarshal(rp, e.Data)
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3))
return
}
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))

if raftReq.V2 != nil {
req := (*RequestV2)(raftReq.V2)
s.w.Trigger(req.ID, s.applyV2Request(req))
s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3))
return
}

Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func TestApplyRequest(t *testing.T) {
v2store: st,
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
resp := srv.applyV2Request((*RequestV2)(&tt.req))
resp := srv.applyV2Request((*RequestV2)(&tt.req), membership.ApplyBoth)

if !reflect.DeepEqual(resp, tt.wresp) {
t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
Path: membership.MemberAttributesStorePath(1),
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
}
srv.applyV2Request((*RequestV2)(&req))
srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth)
w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
t.Errorf("attributes = %v, want %v", g, w)
Expand Down
3 changes: 2 additions & 1 deletion server/etcdserver/v2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
)

Expand Down Expand Up @@ -52,7 +53,7 @@ func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, e
}

func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.Put(r), nil
return a.applier.Put(r, membership.ApplyBoth), nil
}

func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) {
Expand Down

0 comments on commit 788bc53

Please sign in to comment.