From 0a972a3f052f3dcb4743444477d48212ccc8bd49 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 18 May 2021 20:00:55 +0200 Subject: [PATCH] applyV2 should reapply on backend only once During review of: https://github.com/etcd-io/etcd/pull/12988 spotted that PUT is actially writing to v3-backend. If we are replaying WAL log, it might happened that backend's applied_index is > than the WAL's log entry. In such situation we should skip applying on backend V3. I think both the methods (setVersion, setMembersAttributes) are in practice idempotent so its not that 'serious' problem, but for formal correctness adding the proper checks. --- server/etcdserver/apply_v2.go | 12 ++++++------ server/etcdserver/server.go | 4 ++-- server/etcdserver/server_test.go | 4 ++-- server/etcdserver/v2_server.go | 3 ++- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index 6648e3279ee..2f5913f18c6 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -36,7 +36,7 @@ const v2Version = "v2" type ApplierV2 interface { Delete(r *RequestV2) Response Post(r *RequestV2) Response - Put(r *RequestV2) Response + Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response QGet(r *RequestV2) Response Sync(r *RequestV2) Response } @@ -67,7 +67,7 @@ func (a *applierV2store) Post(r *RequestV2) Response { return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions())) } -func (a *applierV2store) Put(r *RequestV2) Response { +func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response { ttlOptions := r.TTLOptions() exists, existsSet := pbutil.GetBool(r.PrevExist) switch { @@ -89,7 +89,7 @@ func (a *applierV2store) Put(r *RequestV2) Response { a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) } if a.cluster != nil { - a.cluster.UpdateAttributes(id, attr, true) + a.cluster.UpdateAttributes(id, attr, shouldApplyV3) } // return an empty response since there is no consumer. return Response{} @@ -98,7 +98,7 @@ func (a *applierV2store) Put(r *RequestV2) Response { if r.Path == membership.StoreClusterVersionKey() { if a.cluster != nil { // persist to backend given v2store can be very stale - a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, membership.ApplyBoth) + a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3) } return Response{} } @@ -117,7 +117,7 @@ func (a *applierV2store) Sync(r *RequestV2) Response { // applyV2Request interprets r as a call to v2store.X // and returns a Response interpreted from v2store.Event -func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) { +func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) { stringer := panicAlternativeStringer{ stringer: r, alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) }, @@ -132,7 +132,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) { case "POST": return s.applyV2.Post(r) case "PUT": - return s.applyV2.Put(r) + return s.applyV2.Put(r, shouldApplyV3) case "DELETE": return s.applyV2.Delete(r) case "QGET": diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2c8855ec33d..56e288cc5f9 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2181,14 +2181,14 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { rp := &r pbutil.MustUnmarshal(rp, e.Data) s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp)) - s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp))) + s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3)) return } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) - s.w.Trigger(req.ID, s.applyV2Request(req)) + s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3)) return } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 13ec0dd8901..c5a61da4736 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -472,7 +472,7 @@ func TestApplyRequest(t *testing.T) { v2store: st, } srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - resp := srv.applyV2Request((*RequestV2)(&tt.req)) + resp := srv.applyV2Request((*RequestV2)(&tt.req), membership.ApplyBoth) if !reflect.DeepEqual(resp, tt.wresp) { t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp) @@ -500,7 +500,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { Path: membership.MemberAttributesStorePath(1), Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`, } - srv.applyV2Request((*RequestV2)(&req)) + srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth) w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}} if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) { t.Errorf("attributes = %v, want %v", g, w) diff --git a/server/etcdserver/v2_server.go b/server/etcdserver/v2_server.go index 7372823c0b3..24c97c924a5 100644 --- a/server/etcdserver/v2_server.go +++ b/server/etcdserver/v2_server.go @@ -19,6 +19,7 @@ import ( "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" ) @@ -52,7 +53,7 @@ func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, e } func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Put(r), nil + return a.applier.Put(r, membership.ApplyBoth), nil } func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) {