From 2891693617e1051bd5a5c950857b402ae722bff8 Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Fri, 15 Nov 2019 16:22:47 -0800 Subject: [PATCH] etcdserver: change "/downgrade/enabled" endpoint to serve linearized data --- etcdserver/api/etcdhttp/peer.go | 48 +++--------------- etcdserver/api/etcdhttp/peer_test.go | 58 +--------------------- etcdserver/api/membership/cluster.go | 24 ++++----- etcdserver/api/membership/cluster_test.go | 22 ++++----- etcdserver/api/membership/store.go | 6 +-- etcdserver/apply.go | 8 +-- etcdserver/cluster_util.go | 47 +++++++----------- etcdserver/server.go | 60 +++++++++++++++++++++-- etcdserver/v3_server.go | 20 ++++---- integration/v3_downgrade_test.go | 2 +- 10 files changed, 122 insertions(+), 173 deletions(-) diff --git a/etcdserver/api/etcdhttp/peer.go b/etcdserver/api/etcdhttp/peer.go index e545158630c7..6018cf0e0463 100644 --- a/etcdserver/api/etcdhttp/peer.go +++ b/etcdserver/api/etcdhttp/peer.go @@ -39,13 +39,12 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer requests. func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerHTTP) http.Handler { - return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s) + return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.DowngradeEnabledHandler()) } -func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler, ds etcdserver.ServerDowngradeHTTP) http.Handler { +func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler, downgradeEnabledHandler http.Handler) http.Handler { peerMembersHandler := newPeerMembersHandler(lg, s.Cluster()) peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s) - downgradeEnabledHandler := newDowngradeEnabledHandler(lg, s.Cluster(), ds) mux := http.NewServeMux() mux.HandleFunc("/", http.NotFound) @@ -53,11 +52,14 @@ func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handle mux.Handle(rafthttp.RaftPrefix+"/", raftHandler) mux.Handle(peerMembersPath, peerMembersHandler) mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler) - mux.Handle(downgradeEnabledPath, downgradeEnabledHandler) if leaseHandler != nil { mux.Handle(leasehttp.LeasePrefix, leaseHandler) mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler) } + + if downgradeEnabledHandler != nil { + mux.Handle(downgradeEnabledPath, downgradeEnabledHandler) + } mux.HandleFunc(versionPath, versionHandler(s.Cluster(), serveVersion)) return mux } @@ -88,20 +90,6 @@ type peerMemberPromoteHandler struct { server etcdserver.Server } -func newDowngradeEnabledHandler(lg *zap.Logger, cluster api.Cluster, s etcdserver.ServerDowngradeHTTP) http.Handler { - return &downgradeEnabledHandler{ - lg: lg, - cluster: cluster, - server: s, - } -} - -type downgradeEnabledHandler struct { - lg *zap.Logger - cluster api.Cluster - server etcdserver.ServerDowngradeHTTP -} - func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r, "GET") { return @@ -174,27 +162,3 @@ func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Requ } } } - -func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r, "GET") { - return - } - w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) - - if r.URL.Path != downgradeEnabledPath { - http.Error(w, "bad path", http.StatusBadRequest) - return - } - - enabled := h.server.DowngradeInfo().Enabled - w.Header().Set("Content-Type", "application/json") - b, err := json.Marshal(enabled) - if err != nil { - if h.lg != nil { - h.lg.Warn("failed to marshal downgrade.Enabled to json", zap.Error(err)) - } else { - plog.Warningf("failed to marshal downgrade.Enabled to json (%v)", err) - } - } - w.Write(b) -} diff --git a/etcdserver/api/etcdhttp/peer_test.go b/etcdserver/api/etcdhttp/peer_test.go index c61193b16363..7033ac9b42ce 100644 --- a/etcdserver/api/etcdhttp/peer_test.go +++ b/etcdserver/api/etcdhttp/peer_test.go @@ -42,7 +42,7 @@ type fakeCluster struct { localID uint64 clientURLs []string members map[uint64]*membership.Member - downgrade *membership.Downgrade + downgrade *membership.DowngradeInfo } func (c *fakeCluster) ID() types.ID { return types.ID(c.id) } @@ -78,8 +78,6 @@ func (s *fakeServer) ClusterVersion() *semver.Version { return nil } func (s *fakeServer) Cluster() api.Cluster { return s.cluster } func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil } -func (s *fakeServer) DowngradeInfo() *membership.Downgrade { return s.cluster.downgrade } - var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("test data")) }) @@ -282,57 +280,3 @@ func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) { } } } - -// TestServeDowngradeEnabledGet verifies the request to get local downgrade enabled status -func TestServeDowngradeEnabledGet(t *testing.T) { - d := &membership.Downgrade{Enabled: true} - cluster := &fakeCluster{ - id: 1, - downgrade: d, - } - s := fakeServer{cluster} - h := newDowngradeEnabledHandler(nil, cluster, &s) - b, err := json.Marshal(d.Enabled) - if err != nil { - t.Fatal(err) - } - str := string(b) - - tests := []struct { - name string - path string - wcode int - wct string - wbody string - }{ - {"Succeeded", downgradeEnabledPath, http.StatusOK, "application/json", str}, - {"Failed with bad path", path.Join(downgradeEnabledPath, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"}, - } - - for i, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - req, err := http.NewRequest("GET", testutil.MustNewURL(t, tt.path).String(), nil) - if err != nil { - t.Fatal(err) - } - rw := httptest.NewRecorder() - h.ServeHTTP(rw, req) - - if rw.Code != tt.wcode { - t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode) - } - if gct := rw.Header().Get("Content-Type"); gct != tt.wct { - t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct) - } - if rw.Body.String() != tt.wbody { - t.Errorf("#%d: body = %s, want %s", i, rw.Body.String(), tt.wbody) - } - gcid := rw.Header().Get("X-Etcd-Cluster-ID") - wcid := cluster.ID().String() - if gcid != wcid { - t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid) - } - }) - - } -} diff --git a/etcdserver/api/membership/cluster.go b/etcdserver/api/membership/cluster.go index 87e2ebf4282d..608f5fad0ed2 100644 --- a/etcdserver/api/membership/cluster.go +++ b/etcdserver/api/membership/cluster.go @@ -60,10 +60,10 @@ type RaftCluster struct { // removed id cannot be reused. removed map[types.ID]bool - downgradeInfo *Downgrade + downgradeInfo *DowngradeInfo } -type Downgrade struct { +type DowngradeInfo struct { // TargetVersion is the target downgrade version, if the cluster is not under downgrading, // the targetVersion will be nil TargetVersion *semver.Version @@ -113,7 +113,7 @@ func NewCluster(lg *zap.Logger, token string) *RaftCluster { token: token, members: make(map[types.ID]*Member), removed: make(map[types.ID]bool), - downgradeInfo: &Downgrade{Enabled: false}, + downgradeInfo: &DowngradeInfo{Enabled: false}, } } @@ -262,11 +262,11 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { c.version = clusterVersionFromBackend(c.be) c.downgradeInfo = downgradeFromBackend(c.be) - var d *Downgrade + var d *DowngradeInfo if c.downgradeInfo == nil { - d = &Downgrade{Enabled: false} + d = &DowngradeInfo{Enabled: false} } else { - d = &Downgrade{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} + d = &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} } mustDetectDowngrade(c.lg, c.version, d) onSet(c.lg, c.version) @@ -806,7 +806,7 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R return nil } -func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *Downgrade) { +func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) { lv := semver.Must(semver.NewVersion(version.Version)) // only keep major.minor version for comparison against cluster version lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} @@ -918,18 +918,18 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID { return ids } -// Downgrade returns the capability status of the cluster -func (c *RaftCluster) Downgrade() *Downgrade { +// DowngradeInfo returns the capability status of the cluster +func (c *RaftCluster) DowngradeInfo() *DowngradeInfo { c.Lock() defer c.Unlock() if c.downgradeInfo == nil { - return &Downgrade{Enabled: false} + return &DowngradeInfo{Enabled: false} } - d := &Downgrade{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} + d := &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion} return d } -func (c *RaftCluster) UpdateDowngrade(d *Downgrade) { +func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo) { c.Lock() defer c.Unlock() diff --git a/etcdserver/api/membership/cluster_test.go b/etcdserver/api/membership/cluster_test.go index 4c541f0a90aa..92feb6ddd54e 100644 --- a/etcdserver/api/membership/cluster_test.go +++ b/etcdserver/api/membership/cluster_test.go @@ -877,15 +877,15 @@ func TestMustDetectDowngrade(t *testing.T) { lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} oneMinorHigher := &semver.Version{Major: lv.Major, Minor: lv.Minor + 1} oneMinorLower := &semver.Version{Major: lv.Major, Minor: lv.Minor - 1} - downgradeEnabledHigherVersion := &Downgrade{Enabled: true, TargetVersion: oneMinorHigher} - downgradeEnabledEqualVersion := &Downgrade{Enabled: true, TargetVersion: lv} - downgradeEnabledLowerVersion := &Downgrade{Enabled: true, TargetVersion: oneMinorLower} - downgradeDisabled := &Downgrade{Enabled: false} + downgradeEnabledHigherVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorHigher} + downgradeEnabledEqualVersion := &DowngradeInfo{Enabled: true, TargetVersion: lv} + downgradeEnabledLowerVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorLower} + downgradeDisabled := &DowngradeInfo{Enabled: false} tests := []struct { name string clusterVersion *semver.Version - downgrade *Downgrade + downgrade *DowngradeInfo success bool message string }{ @@ -983,13 +983,11 @@ func TestMustDetectDowngrade(t *testing.T) { data, err := ioutil.ReadFile(logPath) if err == nil { - t.Log(len(data)) if !bytes.Contains(data, []byte(tt.message)) { t.Errorf("Expected to find %v in log", tt.message) - t.Log(string(data)) } } else { - t.Log(err) + t.Fatal(err) } if !tt.success { @@ -1000,7 +998,7 @@ func TestMustDetectDowngrade(t *testing.T) { } if tt.success && errCmd != nil { - t.Errorf("Expected not failure; Got %v", err) + t.Errorf("Expected not failure; Got %v", errCmd) } }) } @@ -1082,19 +1080,19 @@ func TestGetDowngrade(t *testing.T) { nil, }, { - &RaftCluster{downgradeInfo: &Downgrade{Enabled: false}}, + &RaftCluster{downgradeInfo: &DowngradeInfo{Enabled: false}}, false, nil, }, { - &RaftCluster{downgradeInfo: &Downgrade{Enabled: true, TargetVersion: semver.Must(semver.NewVersion("3.4.0"))}}, + &RaftCluster{downgradeInfo: &DowngradeInfo{Enabled: true, TargetVersion: semver.Must(semver.NewVersion("3.4.0"))}}, true, semver.Must(semver.NewVersion("3.4.0")), }, } for i, tt := range tests { t.Run(string(i), func(t *testing.T) { - d := tt.cluster.Downgrade() + d := tt.cluster.DowngradeInfo() if d.Enabled != tt.expectedEnabled { t.Errorf("Expected %v; Got %v", tt.expectedEnabled, d.Enabled) } diff --git a/etcdserver/api/membership/store.go b/etcdserver/api/membership/store.go index b65eb71ec3d2..2d057043222b 100644 --- a/etcdserver/api/membership/store.go +++ b/etcdserver/api/membership/store.go @@ -75,7 +75,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String())) } -func mustSaveDowngradeToBackend(be backend.Backend, downgrade *Downgrade) { +func mustSaveDowngradeToBackend(be backend.Backend, downgrade *DowngradeInfo) { dkey := backendDowngradeKey() dvalue, err := json.Marshal(downgrade) if err != nil { @@ -88,7 +88,7 @@ func mustSaveDowngradeToBackend(be backend.Backend, downgrade *Downgrade) { tx.UnsafePut(clusterBucketName, dkey, dvalue) } -func downgradeFromBackend(be backend.Backend) *Downgrade { +func downgradeFromBackend(be backend.Backend) *DowngradeInfo { dkey := backendDowngradeKey() if be != nil { tx := be.BatchTx() @@ -97,7 +97,7 @@ func downgradeFromBackend(be backend.Backend) *Downgrade { _, vs := tx.UnsafeRange(clusterBucketName, dkey, nil, 0) if len(vs) != 0 { - var d Downgrade + var d DowngradeInfo if err := json.Unmarshal(vs[0], &d); err != nil { plog.Panicf("fail to unmarshal downgrade: %v", err) } diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 7d03f77fe2e7..37ad0c8c005c 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -690,15 +690,15 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) } func (a *applierV3backend) Downgrade(dr *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { - var d membership.Downgrade + var d membership.DowngradeInfo switch dr.Action { case pb.DowngradeRequest_ENABLE: v := dr.Version - d = membership.Downgrade{Enabled: true, TargetVersion: semver.Must(semver.NewVersion(v))} + d = membership.DowngradeInfo{Enabled: true, TargetVersion: semver.Must(semver.NewVersion(v))} case pb.DowngradeRequest_CANCEL: - d = membership.Downgrade{Enabled: false} + d = membership.DowngradeInfo{Enabled: false} } - a.s.cluster.UpdateDowngrade(&d) + a.s.cluster.SetDowngradeInfo(&d) resp := &pb.DowngradeResponse{Version: a.s.ClusterVersion().String()} return resp, nil } diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index d9528763eac3..943645f5bc0b 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -178,60 +178,47 @@ func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt } // decideAllowedVersionRange decides the available version range of the cluster that local server can join in; -// if the downgrade enable status of the cluster is not decided(some server allows downgrade some not), -// the version window is [localVersion, localVersion]; -// if the downgrade enable status is true, the version window is [localVersion, oneMinorHigher] +// if the downgrade enabled status is true, the version window is [localVersion, oneMinorHigher] // if the downgrade is not enabled, the version window is [MinClusterVersion, localVersion] -func decideAllowedVersionRange(enables []bool) (minV *semver.Version, maxV *semver.Version) { +func decideAllowedVersionRange(downgradeEnabled bool) (minV *semver.Version, maxV *semver.Version) { minV = semver.Must(semver.NewVersion(version.MinClusterVersion)) maxV = semver.Must(semver.NewVersion(version.Version)) maxV = &semver.Version{Major: maxV.Major, Minor: maxV.Minor} - if len(enables) == 0 { - return minV, maxV - } - enable := enables[0] - for _, e := range enables { - // if the downgrade enable status of the cluster is not decided, - // the local server can only join into a cluster with exactly same version - if e != enable { - minV = &semver.Version{Major: maxV.Major, Minor: maxV.Minor} - return minV, maxV - } - } - - if enable { + if downgradeEnabled { minV = &semver.Version{Major: maxV.Major, Minor: maxV.Minor} maxV.Minor = maxV.Minor + 1 } return minV, maxV } -func getDowngradableOfCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) []bool { +// decideDowngradeEnabled will decide the downgrade enabled status of the cluster. +func decideDowngradeEnabled(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool { members := cl.Members() - var enables []bool + for _, m := range members { if m.ID == local { continue } - enable, err := getDowngradable(lg, m, rt) + enable, err := getDowngradeEnabled(lg, m, rt) if err != nil { if lg != nil { lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err)) } else { plog.Warningf("cannot get the downgrade enabled status of member %s (%v)", m.ID, err) } - enables = append(enables, false) } else { - enables = append(enables, enable) + // Since the "/downgrade/enabled" serves linearized data, + // this function can return once it gets a non-error response from the endpoint. + return enable } } - return enables + return false } -// getDowngradeStatus returns the downgrade status of the given member +// getDowngradeEnabled returns the downgrade enabled status of the given member // via its peerURLs. Returns the last error if it fails to get it. -func getDowngradable(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) { +func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) { cc := &http.Client{ Transport: rt, } @@ -241,7 +228,7 @@ func getDowngradable(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) ) for _, u := range m.PeerURLs { - addr := u + "/downgrade/enable" + addr := u + "/downgrade/enabled" resp, err = cc.Get(addr) if err != nil { if lg != nil { @@ -338,7 +325,9 @@ func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *se return cv } -func decideDowngradeStatus(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool { +// isDowngradeFinished decides the cluster downgrade status based on versions map. +// Return true if all servers are downgraded to target version, otherwise return false. +func isDowngradeFinished(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool { for mid, ver := range vers { if ver == nil { return false @@ -372,7 +361,7 @@ func decideDowngradeStatus(lg *zap.Logger, targetVersion *semver.Version, vers m // We set this rule since when the local member joins, another member might be offline. func isCompatibleWithCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool { vers := getVersions(lg, cl, local, rt) - minV, maxV := decideAllowedVersionRange(getDowngradableOfCluster(lg, cl, local, rt)) + minV, maxV := decideAllowedVersionRange(decideDowngradeEnabled(lg, cl, local, rt)) return isCompatibleWithVers(lg, vers, local, minV, maxV) } diff --git a/etcdserver/server.go b/etcdserver/server.go index 1f0f2cbb38d8..a7ce579bc48d 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -103,6 +103,8 @@ const ( // Todo: need to be decided monitorDowngradeInterval = time.Second + + downgradeHTTPTimeout = 5 * time.Second ) var ( @@ -880,10 +882,60 @@ type ServerPeerHTTP interface { type ServerDowngradeHTTP interface { // DowngradeInfo is the downgrade information of the cluster - DowngradeInfo() *membership.Downgrade + DowngradeInfo() *membership.DowngradeInfo + DowngradeEnabledHandler() http.Handler +} + +func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() } + +type downgradeEnabledHandler struct { + lg *zap.Logger + cluster api.Cluster + server *EtcdServer +} + +func (s *EtcdServer) DowngradeEnabledHandler() http.Handler { + return &downgradeEnabledHandler{ + lg: s.getLogger(), + cluster: s.cluster, + server: s, + } } -func (s *EtcdServer) DowngradeInfo() *membership.Downgrade { return s.cluster.Downgrade() } +func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + w.Header().Set("Allow", r.Method) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) + + if r.URL.Path != "/downgrade/enabled" { + http.Error(w, "bad path", http.StatusBadRequest) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), downgradeHTTPTimeout) + defer cancel() + + // serve with linearized downgrade info + if err := h.server.linearizableReadNotify(ctx); err != nil { + http.Error(w, "failed linearized read", http.StatusBadRequest) + return + } + enabled := h.server.DowngradeInfo().Enabled + w.Header().Set("Content-Type", "application/json") + b, err := json.Marshal(enabled) + if err != nil { + if h.lg != nil { + h.lg.Warn("failed to marshal downgrade.Enabled to json", zap.Error(err)) + } else { + plog.Warningf("failed to marshal downgrade.Enabled to json (%v)", err) + } + } + w.Write(b) +} // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. @@ -2591,13 +2643,13 @@ func (s *EtcdServer) monitorDowngrade() { continue } - d := s.cluster.Downgrade() + d := s.cluster.DowngradeInfo() if !d.Enabled { continue } targetVersion := d.TargetVersion - if decideDowngradeStatus(s.getLogger(), targetVersion, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) { + if isDowngradeFinished(s.getLogger(), targetVersion, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) { if lg != nil { lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion.String())) } else { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 0664a1b31be4..f8a5c4724fbb 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -822,13 +822,19 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg resp := &pb.DowngradeResponse{} var err error - cv := s.ClusterVersion() targetVersion, err := semver.NewVersion(v) if err != nil { return nil, fmt.Errorf("wrong version format: %v", err) } targetVersion = &semver.Version{Major: targetVersion.Major, Minor: targetVersion.Minor} + // do linearized read to avoid using stale downgrade information + err = s.linearizableReadNotify(ctx) + if err != nil { + return nil, err + } + + cv := s.ClusterVersion() resp.Version = cv.String() if cv.LessThan(*targetVersion) { err = errors.New("target version too high") @@ -847,16 +853,11 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg return nil, err } - err = s.linearizableReadNotify(ctx) - if err != nil { - return nil, err - } - - downgradeInfo := s.cluster.Downgrade() + downgradeInfo := s.cluster.DowngradeInfo() if downgradeInfo.Enabled { // Todo: return the downgrade status along with the error msg - err = errors.New("the cluster has a ongoing downgrade job") + err = errors.New("the cluster has an ongoing downgrade job") return resp, err } return resp, nil @@ -885,11 +886,12 @@ func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest } func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) { + // do linearized read to avoid using stale downgrade information if err := s.linearizableReadNotify(ctx); err != nil { return nil, err } - downgradeInfo := s.cluster.Downgrade() + downgradeInfo := s.cluster.DowngradeInfo() if !downgradeInfo.Enabled { return nil, errors.New("the cluster is not downgrading") } diff --git a/integration/v3_downgrade_test.go b/integration/v3_downgrade_test.go index c07aeec4dfd0..fe05a38761e5 100644 --- a/integration/v3_downgrade_test.go +++ b/integration/v3_downgrade_test.go @@ -157,7 +157,7 @@ func TestDowngradeEnable(t *testing.T) { } if err != nil { - expectedErrorMsg := "the cluster has a ongoing downgrade job" + expectedErrorMsg := "the cluster has an ongoing downgrade job" if !strings.Contains(err.Error(), expectedErrorMsg) { t.Errorf("expected the error message contains %v; got %v", expectedErrorMsg, err.Error()) }