Skip to content

Commit

Permalink
membership: add downgradeInfo to cluster; add getter and setter of do…
Browse files Browse the repository at this point in the history
…wngradeInfo
  • Loading branch information
YoyinZyc committed Apr 16, 2020
1 parent 1166b1f commit f2c5dd4
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 6 deletions.
82 changes: 78 additions & 4 deletions etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ type RaftCluster struct {
// removed contains the ids of removed members in the cluster.
// removed id cannot be reused.
removed map[types.ID]bool

downgradeInfo *DowngradeInfo
}

type DowngradeInfo struct {
// TargetVersion is the target downgrade version, if the cluster is not under downgrading,
// the targetVersion will be an empty string
TargetVersion string `json:"target-version"`
// Enabled indicates whether the cluster is enabled to downgrade
Enabled bool `json:"enabled"`
}

// ConfigChangeContext represents a context for confChange.
Expand Down Expand Up @@ -102,10 +112,11 @@ func NewCluster(lg *zap.Logger, token string) *RaftCluster {
lg = zap.NewNop()
}
return &RaftCluster{
lg: lg,
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
lg: lg,
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
downgradeInfo: &DowngradeInfo{Enabled: false},
}
}

Expand Down Expand Up @@ -691,6 +702,39 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi
return semver.Must(semver.NewVersion(string(vals[0])))
}

func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
dkey := backendDowngradeKey()
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
keys, vals := tx.UnsafeRange(clusterBucketName, dkey, nil, 0)
if len(keys) == 0 {
return nil
}

if len(keys) != 1 {
lg.Panic(
"unexpected number of keys when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
}
var d DowngradeInfo
if err := json.Unmarshal(vals[0], &d); err != nil {
lg.Panic("failed to unmarshal downgrade information", zap.Error(err))
}

// verify the downgrade info from backend
if d.Enabled {
if _, err := semver.NewVersion(d.TargetVersion); err != nil {
lg.Panic(
"unexpected version format of the downgrade target version from backend",
zap.String("target-version", d.TargetVersion),
)
}
}
return &d
}

// ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
// with the existing cluster. If the validation succeeds, it assigns the IDs
// from the existing cluster to the local cluster.
Expand Down Expand Up @@ -752,6 +796,36 @@ func (c *RaftCluster) IsLocalMemberLearner() bool {
return localMember.IsLearner
}

// DowngradeInfo returns the downgrade status of the cluster
func (c *RaftCluster) DowngradeInfo() *DowngradeInfo {
c.Lock()
defer c.Unlock()
if c.downgradeInfo == nil {
return &DowngradeInfo{Enabled: false}
}
d := &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
return d
}

func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo) {
c.Lock()
defer c.Unlock()

if c.be != nil {
mustSaveDowngradeToBackend(c.lg, c.be, d)
}

c.downgradeInfo = d

if d.Enabled {
c.lg.Info(
"The server is ready to downgrade",
zap.String("target-version", d.TargetVersion),
zap.String("server-version", version.Version),
)
}
}

// IsMemberExist returns if the member with the given id exists in cluster.
func (c *RaftCluster) IsMemberExist(id types.ID) bool {
c.Lock()
Expand Down
20 changes: 18 additions & 2 deletions etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {

tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(membersBucketName, mkey, mvalue)
tx.Unlock()
}

func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
mkey := backendMemberKey(id)

tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDelete(membersBucketName, mkey)
tx.UnsafePut(membersRemovedBucketName, mkey, []byte("removed"))
tx.Unlock()
}

func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
Expand All @@ -76,6 +76,18 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
}

func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
dkey := backendDowngradeKey()
dvalue, err := json.Marshal(downgrade)
if err != nil {
lg.Panic("failed to marshal downgrade information", zap.Error(err))
}
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(clusterBucketName, dkey, dvalue)
}

func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
Expand Down Expand Up @@ -184,6 +196,10 @@ func backendClusterVersionKey() []byte {
return []byte("clusterVersion")
}

func backendDowngradeKey() []byte {
return []byte("downgrade")
}

func mustCreateBackendBuckets(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
Expand Down

0 comments on commit f2c5dd4

Please sign in to comment.