diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4b40e32bada..96098793626 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -140,8 +140,6 @@ type ServerV2 interface { Server Leader() types.ID - // Do takes a V2 request and attempts to fulfill it, returning a Response. - Do(ctx context.Context, r pb.Request) (Response, error) ClientCertAuthEnabled() bool } @@ -2267,46 +2265,6 @@ func (s *EtcdServer) monitorCompactHash() { } } -func (s *EtcdServer) updateClusterVersionV2(ver string) { - lg := s.Logger() - - if s.cluster.Version() == nil { - lg.Info( - "setting up initial cluster version using v2 API", - zap.String("cluster-version", version.Cluster(ver)), - ) - } else { - lg.Info( - "updating cluster version using v2 API", - zap.String("from", version.Cluster(s.cluster.Version().String())), - zap.String("to", version.Cluster(ver)), - ) - } - - req := pb.Request{ - Method: "PUT", - Path: membership.StoreClusterVersionKey(), - Val: ver, - } - - ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) - _, err := s.Do(ctx, req) - cancel() - - switch err { - case nil: - lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver))) - return - - case errors.ErrStopped: - lg.Warn("aborting cluster version update; server is stopped", zap.Error(err)) - return - - default: - lg.Warn("failed to update cluster version", zap.Error(err)) - } -} - func (s *EtcdServer) updateClusterVersionV3(ver string) { lg := s.Logger() diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 7b945942976..62e32c77693 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -21,7 +21,6 @@ import ( "math" "net/http" "os" - "path" "path/filepath" "reflect" "sync" @@ -63,123 +62,6 @@ import ( "go.etcd.io/raft/v3/raftpb" ) -// TestDoLocalAction tests requests which do not need to go through raft to be applied, -// and are served through local data. -func TestDoLocalAction(t *testing.T) { - tests := []struct { - req pb.Request - - wresp Response - werr error - wactions []testutil.Action - }{ - { - pb.Request{Method: "GET", ID: 1, Wait: true}, - Response{Watcher: v2store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}}, - }, - { - pb.Request{Method: "GET", ID: 1}, - Response{Event: &v2store.Event{}}, nil, - []testutil.Action{ - { - Name: "Get", - Params: []any{"", false, false}, - }, - }, - }, - { - pb.Request{Method: "HEAD", ID: 1}, - Response{Event: &v2store.Event{}}, nil, - []testutil.Action{ - { - Name: "Get", - Params: []any{"", false, false}, - }, - }, - }, - { - pb.Request{Method: "BADMETHOD", ID: 1}, - Response{}, errors.ErrUnknownMethod, []testutil.Action{}, - }, - } - for i, tt := range tests { - st := mockstore.NewRecorder() - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - resp, err := srv.Do(context.Background(), tt.req) - - if err != tt.werr { - t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr) - } - if !reflect.DeepEqual(resp, tt.wresp) { - t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp) - } - gaction := st.Action() - if !reflect.DeepEqual(gaction, tt.wactions) { - t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions) - } - } -} - -// TestDoBadLocalAction tests server requests which do not need to go through consensus, -// and return errors when they fetch from local data. -func TestDoBadLocalAction(t *testing.T) { - storeErr := fmt.Errorf("bah") - tests := []struct { - req pb.Request - - wactions []testutil.Action - }{ - { - pb.Request{Method: "GET", ID: 1, Wait: true}, - []testutil.Action{{Name: "Watch"}}, - }, - { - pb.Request{Method: "GET", ID: 1}, - []testutil.Action{ - { - Name: "Get", - Params: []any{"", false, false}, - }, - }, - }, - { - pb.Request{Method: "HEAD", ID: 1}, - []testutil.Action{ - { - Name: "Get", - Params: []any{"", false, false}, - }, - }, - }, - } - for i, tt := range tests { - st := mockstore.NewErrRecorder(storeErr) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - resp, err := srv.Do(context.Background(), tt.req) - - if err != storeErr { - t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr) - } - if !reflect.DeepEqual(resp, Response{}) { - t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{}) - } - gaction := st.Action() - if !reflect.DeepEqual(gaction, tt.wactions) { - t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions) - } - } -} - // TestApplyRepeat tests that server handles repeat raft messages gracefully func TestApplyRepeat(t *testing.T) { lg := zaptest.NewLogger(t) @@ -796,115 +678,6 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { } } -func TestDoProposal(t *testing.T) { - tests := []pb.Request{ - {Method: "POST", ID: 1}, - {Method: "PUT", ID: 1}, - {Method: "DELETE", ID: 1}, - {Method: "GET", ID: 1, Quorum: true}, - } - for i, tt := range tests { - st := mockstore.NewRecorder() - r := newRaftNode(raftNodeConfig{ - lg: zaptest.NewLogger(t), - Node: newNodeCommitter(), - storage: mockstorage.NewStorageRecorder(""), - raftStorage: raft.NewMemoryStorage(), - transport: newNopTransporter(), - }) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - consistIndex: cindex.NewFakeConsistentIndex(0), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - srv.start() - resp, err := srv.Do(context.Background(), tt) - srv.Stop() - - action := st.Action() - if len(action) != 1 { - t.Errorf("#%d: len(action) = %d, want 1", i, len(action)) - } - if err != nil { - t.Fatalf("#%d: err = %v, want nil", i, err) - } - // resp.Index is set in Do() based on the raft state; may either be 0 or 1 - wresp := Response{Event: &v2store.Event{}, Index: resp.Index} - if !reflect.DeepEqual(resp, wresp) { - t.Errorf("#%d: resp = %v, want %v", i, resp, wresp) - } - } -} - -func TestDoProposalCancelled(t *testing.T) { - wt := mockwait.NewRecorder() - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), - w: wt, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) - - if err != errors.ErrCanceled { - t.Fatalf("err = %v, want %v", err, errors.ErrCanceled) - } - w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}} - if !reflect.DeepEqual(wt.Action(), w) { - t.Errorf("wt.action = %+v, want %+v", wt.Action(), w) - } -} - -func TestDoProposalTimeout(t *testing.T) { - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), - w: mockwait.NewNop(), - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - ctx, cancel := context.WithTimeout(context.Background(), 0) - _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) - cancel() - if err != errors.ErrTimeout { - t.Fatalf("err = %v, want %v", err, errors.ErrTimeout) - } -} - -func TestDoProposalStopped(t *testing.T) { - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: newNodeNop()}), - w: mockwait.NewNop(), - reqIDGen: idutil.NewGenerator(0, time.Time{}), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - srv.stopping = make(chan struct{}) - close(srv.stopping) - _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1}) - if err != errors.ErrStopped { - t.Errorf("err = %v, want %v", err, errors.ErrStopped) - } -} - // TestSync tests sync 1. is nonblocking 2. proposes SYNC request. func TestSync(t *testing.T) { n := newNodeRecorder() @@ -1191,73 +964,6 @@ func TestSnapshotOrdering(t *testing.T) { } } -// TestTriggerSnap for Applied > SnapshotCount should trigger a SaveSnap event -func TestTriggerSnap(t *testing.T) { - be, tmpPath := betesting.NewDefaultTmpBackend(t) - defer func() { - os.RemoveAll(tmpPath) - }() - - snapc := 10 - st := mockstore.NewRecorder() - p := mockstorage.NewStorageRecorderStream("") - r := newRaftNode(raftNodeConfig{ - lg: zaptest.NewLogger(t), - Node: newNodeCommitter(), - raftStorage: raft.NewMemoryStorage(), - storage: p, - transport: newNopTransporter(), - }) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - consistIndex: cindex.NewConsistentIndex(be), - } - srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - - srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) - srv.be = be - - cl := membership.NewCluster(zaptest.NewLogger(t)) - srv.cluster = cl - - srv.start() - - donec := make(chan struct{}) - go func() { - defer close(donec) - wcnt := 3 + snapc - gaction, _ := p.Wait(wcnt) - - // each operation is recorded as a Save - // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + Release - if len(gaction) != wcnt { - t.Logf("gaction: %v", gaction) - t.Errorf("len(action) = %d, want %d", len(gaction), wcnt) - return - } - if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[wcnt-2]) - } - - if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) { - t.Errorf("action = %s, want Release", gaction[wcnt-1]) - } - }() - - for i := 0; i < snapc+1; i++ { - srv.Do(context.Background(), pb.Request{Method: "PUT"}) - } - - <-donec - srv.Stop() -} - // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with // proposals. func TestConcurrentApplyAndSnapshotV3(t *testing.T) { @@ -1631,53 +1337,6 @@ func TestPublishV3Retry(t *testing.T) { <-ch } -func TestUpdateVersion(t *testing.T) { - n := newNodeRecorder() - ch := make(chan any, 1) - // simulate that request has gone through consensus - ch <- Response{} - w := wait.NewWithResponse(ch) - ctx, cancel := context.WithCancel(context.TODO()) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - memberId: 1, - Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}), - attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, - cluster: &membership.RaftCluster{}, - w: w, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - - ctx: ctx, - cancel: cancel, - } - srv.updateClusterVersionV2("2.0.0") - - action := n.Action() - if len(action) != 1 { - t.Fatalf("len(action) = %d, want 1", len(action)) - } - if action[0].Name != "Propose" { - t.Fatalf("action = %s, want Propose", action[0].Name) - } - data := action[0].Params[0].([]byte) - var r pb.Request - if err := r.Unmarshal(data); err != nil { - t.Fatalf("unmarshal request error: %v", err) - } - if r.Method != "PUT" { - t.Errorf("method = %s, want PUT", r.Method) - } - if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath { - t.Errorf("path = %s, want %s", r.Path, wpath) - } - if r.Val != "2.0.0" { - t.Errorf("val = %s, want %s", r.Val, "2.0.0") - } -} - func TestUpdateVersionV3(t *testing.T) { n := newNodeRecorder() ch := make(chan any, 1) @@ -1914,25 +1573,6 @@ func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange return &raftpb.ConfState{} } -// nodeCommitter commits proposed data immediately. -type nodeCommitter struct { - readyNode - index uint64 -} - -func newNodeCommitter() raft.Node { - return &nodeCommitter{*newNopReadyNode(), 0} -} -func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error { - n.index++ - ents := []raftpb.Entry{{Index: n.index, Data: data}} - n.readyc <- raft.Ready{ - Entries: ents, - CommittedEntries: ents, - } - return nil -} - func newTestCluster(t testing.TB) *membership.RaftCluster { return membership.NewCluster(zaptest.NewLogger(t)) } diff --git a/server/etcdserver/v2_server.go b/server/etcdserver/v2_server.go index 517d7ca7f70..8636204b544 100644 --- a/server/etcdserver/v2_server.go +++ b/server/etcdserver/v2_server.go @@ -15,152 +15,11 @@ package etcdserver import ( - "context" - "time" - pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/server/v3/etcdserver/api/membership" - "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" - "go.etcd.io/etcd/server/v3/etcdserver/errors" ) type RequestV2 pb.Request -type RequestV2Handler interface { - Post(ctx context.Context, r *RequestV2) (Response, error) - Put(ctx context.Context, r *RequestV2) (Response, error) - Delete(ctx context.Context, r *RequestV2) (Response, error) - QGet(ctx context.Context, r *RequestV2) (Response, error) - Get(ctx context.Context, r *RequestV2) (Response, error) - Head(ctx context.Context, r *RequestV2) (Response, error) -} - -type reqV2HandlerEtcdServer struct { - reqV2HandlerStore - s *EtcdServer -} - -type reqV2HandlerStore struct { - store v2store.Store - applier ApplierV2 -} - -func NewStoreRequestV2Handler(s v2store.Store, applier ApplierV2) RequestV2Handler { - return &reqV2HandlerStore{s, applier} -} - -func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Post(r), nil -} - -func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Put(r, membership.ApplyBoth), nil -} - -func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.Delete(r), nil -} - -func (a *reqV2HandlerStore) QGet(ctx context.Context, r *RequestV2) (Response, error) { - return a.applier.QGet(r), nil -} - -func (a *reqV2HandlerStore) Get(ctx context.Context, r *RequestV2) (Response, error) { - if r.Wait { - wc, err := a.store.Watch(r.Path, r.Recursive, r.Stream, r.Since) - return Response{Watcher: wc}, err - } - ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted) - return Response{Event: ev}, err -} - -func (a *reqV2HandlerStore) Head(ctx context.Context, r *RequestV2) (Response, error) { - ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted) - return Response{Event: ev}, err -} - -func (a *reqV2HandlerEtcdServer) Post(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) Put(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) Delete(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) QGet(ctx context.Context, r *RequestV2) (Response, error) { - return a.processRaftRequest(ctx, r) -} - -func (a *reqV2HandlerEtcdServer) processRaftRequest(ctx context.Context, r *RequestV2) (Response, error) { - data, err := ((*pb.Request)(r)).Marshal() - if err != nil { - return Response{}, err - } - ch := a.s.w.Register(r.ID) - - start := time.Now() - a.s.r.Propose(ctx, data) - proposalsPending.Inc() - defer proposalsPending.Dec() - - select { - case x := <-ch: - resp := x.(Response) - return resp, resp.Err - case <-ctx.Done(): - proposalsFailed.Inc() - a.s.w.Trigger(r.ID, nil) // GC wait - return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start) - case <-a.s.stopping: - } - return Response{}, errors.ErrStopped -} - -func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { - r.ID = s.reqIDGen.Next() - h := &reqV2HandlerEtcdServer{ - reqV2HandlerStore: reqV2HandlerStore{ - store: s.v2store, - applier: s.applyV2, - }, - s: s, - } - rp := &r - resp, err := ((*RequestV2)(rp)).Handle(ctx, h) - resp.Term, resp.Index = s.Term(), s.CommittedIndex() - return resp, err -} - -// Handle interprets r and performs an operation on s.store according to r.Method -// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with -// Quorum == true, r will be sent through consensus before performing its -// respective operation. Do will block until an action is performed or there is -// an error. -func (r *RequestV2) Handle(ctx context.Context, v2api RequestV2Handler) (Response, error) { - if r.Method == "GET" && r.Quorum { - r.Method = "QGET" - } - switch r.Method { - case "POST": - return v2api.Post(ctx, r) - case "PUT": - return v2api.Put(ctx, r) - case "DELETE": - return v2api.Delete(ctx, r) - case "QGET": - return v2api.QGet(ctx, r) - case "GET": - return v2api.Get(ctx, r) - case "HEAD": - return v2api.Head(ctx, r) - } - return Response{}, errors.ErrUnknownMethod -} - func (r *RequestV2) String() string { rpb := pb.Request(*r) return rpb.String()