Skip to content

Commit

Permalink
etcdserver: change "/downgrade/enabled" endpoint to serve linearized …
Browse files Browse the repository at this point in the history
…data
  • Loading branch information
YoyinZyc committed Nov 16, 2019
1 parent 92c7e20 commit 2891693
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 173 deletions.
48 changes: 6 additions & 42 deletions etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,27 @@ const (

// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerHTTP) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s)
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.DowngradeEnabledHandler())
}

func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler, ds etcdserver.ServerDowngradeHTTP) http.Handler {
func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler, downgradeEnabledHandler http.Handler) http.Handler {
peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
downgradeEnabledHandler := newDowngradeEnabledHandler(lg, s.Cluster(), ds)

mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(rafthttp.RaftPrefix, raftHandler)
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPath, peerMembersHandler)
mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
mux.Handle(downgradeEnabledPath, downgradeEnabledHandler)
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}

if downgradeEnabledHandler != nil {
mux.Handle(downgradeEnabledPath, downgradeEnabledHandler)
}
mux.HandleFunc(versionPath, versionHandler(s.Cluster(), serveVersion))
return mux
}
Expand Down Expand Up @@ -88,20 +90,6 @@ type peerMemberPromoteHandler struct {
server etcdserver.Server
}

func newDowngradeEnabledHandler(lg *zap.Logger, cluster api.Cluster, s etcdserver.ServerDowngradeHTTP) http.Handler {
return &downgradeEnabledHandler{
lg: lg,
cluster: cluster,
server: s,
}
}

type downgradeEnabledHandler struct {
lg *zap.Logger
cluster api.Cluster
server etcdserver.ServerDowngradeHTTP
}

func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, "GET") {
return
Expand Down Expand Up @@ -174,27 +162,3 @@ func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Requ
}
}
}

func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, "GET") {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())

if r.URL.Path != downgradeEnabledPath {
http.Error(w, "bad path", http.StatusBadRequest)
return
}

enabled := h.server.DowngradeInfo().Enabled
w.Header().Set("Content-Type", "application/json")
b, err := json.Marshal(enabled)
if err != nil {
if h.lg != nil {
h.lg.Warn("failed to marshal downgrade.Enabled to json", zap.Error(err))
} else {
plog.Warningf("failed to marshal downgrade.Enabled to json (%v)", err)
}
}
w.Write(b)
}
58 changes: 1 addition & 57 deletions etcdserver/api/etcdhttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type fakeCluster struct {
localID uint64
clientURLs []string
members map[uint64]*membership.Member
downgrade *membership.Downgrade
downgrade *membership.DowngradeInfo
}

func (c *fakeCluster) ID() types.ID { return types.ID(c.id) }
Expand Down Expand Up @@ -78,8 +78,6 @@ func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
func (s *fakeServer) Cluster() api.Cluster { return s.cluster }
func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil }

func (s *fakeServer) DowngradeInfo() *membership.Downgrade { return s.cluster.downgrade }

var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})
Expand Down Expand Up @@ -282,57 +280,3 @@ func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) {
}
}
}

// TestServeDowngradeEnabledGet verifies the request to get local downgrade enabled status
func TestServeDowngradeEnabledGet(t *testing.T) {
d := &membership.Downgrade{Enabled: true}
cluster := &fakeCluster{
id: 1,
downgrade: d,
}
s := fakeServer{cluster}
h := newDowngradeEnabledHandler(nil, cluster, &s)
b, err := json.Marshal(d.Enabled)
if err != nil {
t.Fatal(err)
}
str := string(b)

tests := []struct {
name string
path string
wcode int
wct string
wbody string
}{
{"Succeeded", downgradeEnabledPath, http.StatusOK, "application/json", str},
{"Failed with bad path", path.Join(downgradeEnabledPath, "bad"), http.StatusBadRequest, "text/plain; charset=utf-8", "bad path\n"},
}

for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := http.NewRequest("GET", testutil.MustNewURL(t, tt.path).String(), nil)
if err != nil {
t.Fatal(err)
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, req)

if rw.Code != tt.wcode {
t.Errorf("#%d: code=%d, want %d", i, rw.Code, tt.wcode)
}
if gct := rw.Header().Get("Content-Type"); gct != tt.wct {
t.Errorf("#%d: content-type = %s, want %s", i, gct, tt.wct)
}
if rw.Body.String() != tt.wbody {
t.Errorf("#%d: body = %s, want %s", i, rw.Body.String(), tt.wbody)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := cluster.ID().String()
if gcid != wcid {
t.Errorf("#%d: cid = %s, want %s", i, gcid, wcid)
}
})

}
}
24 changes: 12 additions & 12 deletions etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ type RaftCluster struct {
// removed id cannot be reused.
removed map[types.ID]bool

downgradeInfo *Downgrade
downgradeInfo *DowngradeInfo
}

type Downgrade struct {
type DowngradeInfo struct {
// TargetVersion is the target downgrade version, if the cluster is not under downgrading,
// the targetVersion will be nil
TargetVersion *semver.Version
Expand Down Expand Up @@ -113,7 +113,7 @@ func NewCluster(lg *zap.Logger, token string) *RaftCluster {
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
downgradeInfo: &Downgrade{Enabled: false},
downgradeInfo: &DowngradeInfo{Enabled: false},
}
}

Expand Down Expand Up @@ -262,11 +262,11 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.version = clusterVersionFromBackend(c.be)

c.downgradeInfo = downgradeFromBackend(c.be)
var d *Downgrade
var d *DowngradeInfo
if c.downgradeInfo == nil {
d = &Downgrade{Enabled: false}
d = &DowngradeInfo{Enabled: false}
} else {
d = &Downgrade{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
d = &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
}
mustDetectDowngrade(c.lg, c.version, d)
onSet(c.lg, c.version)
Expand Down Expand Up @@ -806,7 +806,7 @@ func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *R
return nil
}

func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *Downgrade) {
func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version, d *DowngradeInfo) {
lv := semver.Must(semver.NewVersion(version.Version))
// only keep major.minor version for comparison against cluster version
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
Expand Down Expand Up @@ -918,18 +918,18 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID {
return ids
}

// Downgrade returns the capability status of the cluster
func (c *RaftCluster) Downgrade() *Downgrade {
// DowngradeInfo returns the capability status of the cluster
func (c *RaftCluster) DowngradeInfo() *DowngradeInfo {
c.Lock()
defer c.Unlock()
if c.downgradeInfo == nil {
return &Downgrade{Enabled: false}
return &DowngradeInfo{Enabled: false}
}
d := &Downgrade{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
d := &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
return d
}

func (c *RaftCluster) UpdateDowngrade(d *Downgrade) {
func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo) {
c.Lock()
defer c.Unlock()

Expand Down
22 changes: 10 additions & 12 deletions etcdserver/api/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,15 +877,15 @@ func TestMustDetectDowngrade(t *testing.T) {
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
oneMinorHigher := &semver.Version{Major: lv.Major, Minor: lv.Minor + 1}
oneMinorLower := &semver.Version{Major: lv.Major, Minor: lv.Minor - 1}
downgradeEnabledHigherVersion := &Downgrade{Enabled: true, TargetVersion: oneMinorHigher}
downgradeEnabledEqualVersion := &Downgrade{Enabled: true, TargetVersion: lv}
downgradeEnabledLowerVersion := &Downgrade{Enabled: true, TargetVersion: oneMinorLower}
downgradeDisabled := &Downgrade{Enabled: false}
downgradeEnabledHigherVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorHigher}
downgradeEnabledEqualVersion := &DowngradeInfo{Enabled: true, TargetVersion: lv}
downgradeEnabledLowerVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorLower}
downgradeDisabled := &DowngradeInfo{Enabled: false}

tests := []struct {
name string
clusterVersion *semver.Version
downgrade *Downgrade
downgrade *DowngradeInfo
success bool
message string
}{
Expand Down Expand Up @@ -983,13 +983,11 @@ func TestMustDetectDowngrade(t *testing.T) {

data, err := ioutil.ReadFile(logPath)
if err == nil {
t.Log(len(data))
if !bytes.Contains(data, []byte(tt.message)) {
t.Errorf("Expected to find %v in log", tt.message)
t.Log(string(data))
}
} else {
t.Log(err)
t.Fatal(err)
}

if !tt.success {
Expand All @@ -1000,7 +998,7 @@ func TestMustDetectDowngrade(t *testing.T) {
}

if tt.success && errCmd != nil {
t.Errorf("Expected not failure; Got %v", err)
t.Errorf("Expected not failure; Got %v", errCmd)
}
})
}
Expand Down Expand Up @@ -1082,19 +1080,19 @@ func TestGetDowngrade(t *testing.T) {
nil,
},
{
&RaftCluster{downgradeInfo: &Downgrade{Enabled: false}},
&RaftCluster{downgradeInfo: &DowngradeInfo{Enabled: false}},
false,
nil,
},
{
&RaftCluster{downgradeInfo: &Downgrade{Enabled: true, TargetVersion: semver.Must(semver.NewVersion("3.4.0"))}},
&RaftCluster{downgradeInfo: &DowngradeInfo{Enabled: true, TargetVersion: semver.Must(semver.NewVersion("3.4.0"))}},
true,
semver.Must(semver.NewVersion("3.4.0")),
},
}
for i, tt := range tests {
t.Run(string(i), func(t *testing.T) {
d := tt.cluster.Downgrade()
d := tt.cluster.DowngradeInfo()
if d.Enabled != tt.expectedEnabled {
t.Errorf("Expected %v; Got %v", tt.expectedEnabled, d.Enabled)
}
Expand Down
6 changes: 3 additions & 3 deletions etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
}

func mustSaveDowngradeToBackend(be backend.Backend, downgrade *Downgrade) {
func mustSaveDowngradeToBackend(be backend.Backend, downgrade *DowngradeInfo) {
dkey := backendDowngradeKey()
dvalue, err := json.Marshal(downgrade)
if err != nil {
Expand All @@ -88,7 +88,7 @@ func mustSaveDowngradeToBackend(be backend.Backend, downgrade *Downgrade) {
tx.UnsafePut(clusterBucketName, dkey, dvalue)
}

func downgradeFromBackend(be backend.Backend) *Downgrade {
func downgradeFromBackend(be backend.Backend) *DowngradeInfo {
dkey := backendDowngradeKey()
if be != nil {
tx := be.BatchTx()
Expand All @@ -97,7 +97,7 @@ func downgradeFromBackend(be backend.Backend) *Downgrade {
_, vs := tx.UnsafeRange(clusterBucketName, dkey, nil, 0)

if len(vs) != 0 {
var d Downgrade
var d DowngradeInfo
if err := json.Unmarshal(vs[0], &d); err != nil {
plog.Panicf("fail to unmarshal downgrade: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,15 +690,15 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
}

func (a *applierV3backend) Downgrade(dr *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
var d membership.Downgrade
var d membership.DowngradeInfo
switch dr.Action {
case pb.DowngradeRequest_ENABLE:
v := dr.Version
d = membership.Downgrade{Enabled: true, TargetVersion: semver.Must(semver.NewVersion(v))}
d = membership.DowngradeInfo{Enabled: true, TargetVersion: semver.Must(semver.NewVersion(v))}
case pb.DowngradeRequest_CANCEL:
d = membership.Downgrade{Enabled: false}
d = membership.DowngradeInfo{Enabled: false}
}
a.s.cluster.UpdateDowngrade(&d)
a.s.cluster.SetDowngradeInfo(&d)
resp := &pb.DowngradeResponse{Version: a.s.ClusterVersion().String()}
return resp, nil
}
Expand Down
Loading

0 comments on commit 2891693

Please sign in to comment.