From 9c02d763d4b9859b7a648f3fd1050d126e84fcc1 Mon Sep 17 00:00:00 2001 From: Geeta Gharpure Date: Fri, 2 Jun 2023 22:14:42 +0000 Subject: [PATCH] Try out removing v2 applier Signed-off-by: Geeta Gharpure --- server/etcdserver/apply_v2.go | 166 ------------------------------ server/etcdserver/server.go | 87 ++-------------- server/etcdserver/server_test.go | 22 ++-- server/etcdserver/v2_server.go | 167 ------------------------------- 4 files changed, 19 insertions(+), 423 deletions(-) delete mode 100644 server/etcdserver/apply_v2.go delete mode 100644 server/etcdserver/v2_server.go diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go deleted file mode 100644 index c9e4c3e87b0..00000000000 --- a/server/etcdserver/apply_v2.go +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcdserver - -import ( - "encoding/json" - "fmt" - "path" - "time" - "unicode/utf8" - - "github.com/coreos/go-semver/semver" - - "go.etcd.io/etcd/pkg/v3/pbutil" - "go.etcd.io/etcd/server/v3/etcdserver/api" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" - "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" - "go.etcd.io/etcd/server/v3/etcdserver/errors" - "go.etcd.io/etcd/server/v3/etcdserver/txn" - - "go.uber.org/zap" -) - -const v2Version = "v2" - -// ApplierV2 is the interface for processing V2 raft messages -type ApplierV2 interface { - Delete(r *RequestV2) Response - Post(r *RequestV2) Response - Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response - QGet(r *RequestV2) Response - Sync(r *RequestV2) Response -} - -func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 { - if lg == nil { - lg = zap.NewNop() - } - return &applierV2store{lg: lg, store: s, cluster: c} -} - -type applierV2store struct { - lg *zap.Logger - store v2store.Store - cluster *membership.RaftCluster -} - -func (a *applierV2store) Delete(r *RequestV2) Response { - switch { - case r.PrevIndex > 0 || r.PrevValue != "": - return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)) - default: - return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive)) - } -} - -func (a *applierV2store) Post(r *RequestV2) Response { - return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions())) -} - -func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response { - ttlOptions := r.TTLOptions() - exists, existsSet := pbutil.GetBool(r.PrevExist) - switch { - case existsSet: - if exists { - if r.PrevIndex == 0 && r.PrevValue == "" { - return toResponse(a.store.Update(r.Path, r.Val, ttlOptions)) - } - return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) - } - return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) - case r.PrevIndex > 0 || r.PrevValue != "": - return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) - default: - if storeMemberAttributeRegexp.MatchString(r.Path) { - id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path)) - var attr membership.Attributes - if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { - a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) - } - if a.cluster != nil { - a.cluster.UpdateAttributes(id, attr, shouldApplyV3) - } - // return an empty response since there is no consumer. - return Response{} - } - // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6 - if r.Path == membership.StoreClusterVersionKey() { - if a.cluster != nil { - // persist to backend given v2store can be very stale - a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3) - } - return Response{} - } - return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) - } -} - -func (a *applierV2store) QGet(r *RequestV2) Response { - return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted)) -} - -func (a *applierV2store) Sync(r *RequestV2) Response { - a.store.DeleteExpiredKeys(time.Unix(0, r.Time)) - return Response{} -} - -// applyV2Request interprets r as a call to v2store.X -// and returns a Response interpreted from v2store.Event -func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) { - stringer := panicAlternativeStringer{ - stringer: r, - alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) }, - } - defer func(start time.Time) { - if !utf8.ValidString(r.Method) { - s.lg.Info("method is not valid utf-8") - return - } - success := resp.Err == nil - txn.ApplySecObserve(v2Version, r.Method, success, time.Since(start)) - txn.WarnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil) - }(time.Now()) - - switch r.Method { - case "POST": - return s.applyV2.Post(r) - case "PUT": - return s.applyV2.Put(r, shouldApplyV3) - case "DELETE": - return s.applyV2.Delete(r) - case "QGET": - return s.applyV2.QGet(r) - case "SYNC": - return s.applyV2.Sync(r) - default: - // This should never be reached, but just in case: - return Response{Err: errors.ErrUnknownMethod} - } -} - -func (r *RequestV2) TTLOptions() v2store.TTLOptionSet { - refresh, _ := pbutil.GetBool(r.Refresh) - ttlOptions := v2store.TTLOptionSet{Refresh: refresh} - if r.Expiration != 0 { - ttlOptions.ExpireTime = time.Unix(0, r.Expiration) - } - return ttlOptions -} - -func toResponse(ev *v2store.Event, err error) Response { - return Response{Event: ev, Err: err} -} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 71ee6b81804..9bed8e57c30 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -61,7 +61,6 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats" - "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm" "go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor" "go.etcd.io/etcd/server/v3/etcdserver/cindex" @@ -134,20 +133,11 @@ func init() { } type Response struct { - Term uint64 - Index uint64 - Event *v2store.Event - Watcher v2store.Watcher - Err error -} - -type ServerV2 interface { - Server - Leader() types.ID - - // Do takes a V2 request and attempts to fulfill it, returning a Response. - Do(ctx context.Context, r pb.Request) (Response, error) - ClientCertAuthEnabled() bool + Term uint64 + Index uint64 + //Event *v2store.Event + //Watcher v2store.Watcher + Err error } type ServerV3 interface { @@ -249,11 +239,8 @@ type EtcdServer struct { cluster *membership.RaftCluster - v2store v2store.Store snapshotter *snap.Snapshotter - applyV2 ApplierV2 - uberApply apply.UberApplier applyWait wait.WaitTime @@ -342,9 +329,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) - if srv.v2store != nil { - srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) - } srv.be = b.storage.backend.be srv.beHooks = b.storage.backend.beHooks @@ -441,10 +425,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } srv.r.transport = tr - if srv.v2store != nil { - panic(fmt.Sprintf("This constructor should not have set v2store")) - } - return srv, nil } @@ -636,7 +616,7 @@ func (s *EtcdServer) Cluster() api.Cluster { return s.cluster } func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) } type ServerPeer interface { - ServerV2 + Server RaftHandler() http.Handler LeaseHandler() http.Handler } @@ -858,9 +838,7 @@ func (s *EtcdServer) run() { lg.Warn("data-dir used by this member must be removed") return case <-getSyncC(): - if s.v2store != nil && s.v2store.HasTTLKeys() { - s.sync(s.Cfg.ReqTimeout()) - } + lg.Warn("NOP") case <-s.stop: return } @@ -1895,20 +1873,13 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { rp := &r pbutil.MustUnmarshal(rp, e.Data) s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp)) - if s.v2store != nil { - s.lg.Debug("V2request applyEntryNormal", zap.Stringer("raftReq", &raftReq)) - s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3)) - } + s.lg.Warn("V2request applyEntryNormal will be dropped", zap.Stringer("raftReq", &raftReq)) return } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { - req := (*RequestV2)(raftReq.V2) - s.lg.Debug("V2 applyEntryNormal", zap.Stringer("raftReq", &raftReq)) - if s.v2store != nil { - s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3)) - } + s.lg.Warn("V2 applyEntryNormal will be dropped", zap.Stringer("raftReq", &raftReq)) return } @@ -2264,46 +2235,6 @@ func (s *EtcdServer) monitorCompactHash() { } } -func (s *EtcdServer) updateClusterVersionV2(ver string) { - lg := s.Logger() - - if s.cluster.Version() == nil { - lg.Info( - "setting up initial cluster version using v2 API", - zap.String("cluster-version", version.Cluster(ver)), - ) - } else { - lg.Info( - "updating cluster version using v2 API", - zap.String("from", version.Cluster(s.cluster.Version().String())), - zap.String("to", version.Cluster(ver)), - ) - } - - req := pb.Request{ - Method: "PUT", - Path: membership.StoreClusterVersionKey(), - Val: ver, - } - - ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) - _, err := s.Do(ctx, req) - cancel() - - switch err { - case nil: - lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver))) - return - - case errors.ErrStopped: - lg.Warn("aborting cluster version update; server is stopped", zap.Error(err)) - return - - default: - lg.Warn("failed to update cluster version", zap.Error(err)) - } -} - func (s *EtcdServer) updateClusterVersionV3(ver string) { lg := s.Logger() diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 93265007b00..18dae0a4e72 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -20,9 +20,6 @@ import ( "fmt" "math" "net/http" - "os" - "path" - "path/filepath" "reflect" "sync" "testing" @@ -34,10 +31,8 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/membershippb" - "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/client/pkg/v3/types" - "go.etcd.io/etcd/client/pkg/v3/verify" "go.etcd.io/etcd/pkg/v3/idutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/pkg/v3/wait" @@ -46,17 +41,13 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" - "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" apply2 "go.etcd.io/etcd/server/v3/etcdserver/apply" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/etcdserver/errors" - "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/mock/mockstorage" - "go.etcd.io/etcd/server/v3/mock/mockstore" "go.etcd.io/etcd/server/v3/mock/mockwait" serverstorage "go.etcd.io/etcd/server/v3/storage" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" - "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -64,6 +55,7 @@ import ( // TestDoLocalAction tests requests which do not need to go through raft to be applied, // and are served through local data. +/* func TestDoLocalAction(t *testing.T) { tests := []struct { req pb.Request @@ -123,9 +115,11 @@ func TestDoLocalAction(t *testing.T) { } } } +*/ // TestDoBadLocalAction tests server requests which do not need to go through consensus, // and return errors when they fetch from local data. +/* func TestDoBadLocalAction(t *testing.T) { storeErr := fmt.Errorf("bah") tests := []struct { @@ -178,6 +172,7 @@ func TestDoBadLocalAction(t *testing.T) { } } } +*/ /* Following test case crafts a V2 request and tests for repeat apply operation. @@ -249,7 +244,7 @@ func TestApplyRepeat(t *testing.T) { } } */ - +/* func TestApplyRequest(t *testing.T) { tests := []struct { req pb.Request @@ -492,6 +487,7 @@ func TestApplyRequest(t *testing.T) { } } } +*/ /* func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { @@ -790,6 +786,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { } } +/* func TestDoProposal(t *testing.T) { tests := []pb.Request{ {Method: "POST", ID: 1}, @@ -1316,7 +1313,6 @@ func TestTriggerSnap(t *testing.T) { srv.Stop() } -/* Following test case possibly needs to be rewritten to send out req type compatible with v3 // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with // proposals. @@ -1678,6 +1674,7 @@ func TestPublishV3Retry(t *testing.T) { <-ch } +/* func TestUpdateVersion(t *testing.T) { n := newNodeRecorder() ch := make(chan interface{}, 1) @@ -1700,7 +1697,7 @@ func TestUpdateVersion(t *testing.T) { ctx: ctx, cancel: cancel, } - srv.updateClusterVersionV2("2.0.0") + srv.updateClusterVersionV3("2.0.0") action := n.Action() if len(action) != 1 { @@ -1724,6 +1721,7 @@ func TestUpdateVersion(t *testing.T) { t.Errorf("val = %s, want %s", r.Val, "2.0.0") } } +*/ func TestStopNotify(t *testing.T) { s := &EtcdServer{ diff --git a/server/etcdserver/v2_server.go b/server/etcdserver/v2_server.go deleted file mode 100644 index 517d7ca7f70..00000000000 --- a/server/etcdserver/v2_server.go +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcdserver - -import ( - "context" - "time" - - pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" - "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" - "go.etcd.io/etcd/server/v3/etcdserver/errors" -) - -type RequestV2 pb.Request - -type RequestV2Handler interface { - Post(ctx context.Context, r *RequestV2) (Response, error) - Put(ctx context.Context, r *RequestV2) (Response, error) - Delete(ctx context.Context, r *RequestV2) (Response, error) - QGet(ctx context.Context, r *RequestV2) (Response, error) - Get(ctx context.Context, r *RequestV2) (Response, error) - Head(ctx context.Context, r *RequestV2) (Response, error) -} - -type reqV2HandlerEtcdServer struct { - reqV2HandlerStore - s *EtcdServer -} - -type reqV2HandlerStore struct { - store v2store.Store - applier ApplierV2 -} - -func NewStoreRequestV2Handler(s v2store.Store, applier ApplierV2) RequestV2Handler { - return &reqV2HandlerStore{s, applier} -} - -func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Post(r), nil -} - -func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Put(r, membership.ApplyBoth), nil -} - -func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Delete(r), nil -} - -func (a *reqV2HandlerStore) QGet(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.QGet(r), nil -} - -func (a *reqV2HandlerStore) Get(ctx context.Context, r *RequestV2) (Response, error) { - if r.Wait { - wc, err := a.store.Watch(r.Path, r.Recursive, r.Stream, r.Since) - return Response{Watcher: wc}, err - } - ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted) - return Response{Event: ev}, err -} - -func (a *reqV2HandlerStore) Head(ctx context.Context, r *RequestV2) (Response, error) { - ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted) - return Response{Event: ev}, err -} - -func (a *reqV2HandlerEtcdServer) Post(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) Put(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) Delete(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) QGet(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) processRaftRequest(ctx context.Context, r *RequestV2) (Response, error) { - data, err := ((*pb.Request)(r)).Marshal() - if err != nil { - return Response{}, err - } - ch := a.s.w.Register(r.ID) - - start := time.Now() - a.s.r.Propose(ctx, data) - proposalsPending.Inc() - defer proposalsPending.Dec() - - select { - case x := <-ch: - resp := x.(Response) - return resp, resp.Err - case <-ctx.Done(): - proposalsFailed.Inc() - a.s.w.Trigger(r.ID, nil) // GC wait - return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start) - case <-a.s.stopping: - } - return Response{}, errors.ErrStopped -} - -func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { - r.ID = s.reqIDGen.Next() - h := &reqV2HandlerEtcdServer{ - reqV2HandlerStore: reqV2HandlerStore{ - store: s.v2store, - applier: s.applyV2, - }, - s: s, - } - rp := &r - resp, err := ((*RequestV2)(rp)).Handle(ctx, h) - resp.Term, resp.Index = s.Term(), s.CommittedIndex() - return resp, err -} - -// Handle interprets r and performs an operation on s.store according to r.Method -// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with -// Quorum == true, r will be sent through consensus before performing its -// respective operation. Do will block until an action is performed or there is -// an error. -func (r *RequestV2) Handle(ctx context.Context, v2api RequestV2Handler) (Response, error) { - if r.Method == "GET" && r.Quorum { - r.Method = "QGET" - } - switch r.Method { - case "POST": - return v2api.Post(ctx, r) - case "PUT": - return v2api.Put(ctx, r) - case "DELETE": - return v2api.Delete(ctx, r) - case "QGET": - return v2api.QGet(ctx, r) - case "GET": - return v2api.Get(ctx, r) - case "HEAD": - return v2api.Head(ctx, r) - } - return Response{}, errors.ErrUnknownMethod -} - -func (r *RequestV2) String() string { - rpb := pb.Request(*r) - return rpb.String() -}