Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement single node downgrades #13405

Merged
merged 6 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
)

// NewMigrateCommand prints out the version of etcd.
Expand Down Expand Up @@ -90,12 +92,24 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(dbPath)

walPath := datadir.ToWalDir(o.dataDir)
w, err := wal.OpenForRead(GetLogger(), walPath, walpb.Snapshot{})
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %v`, err)
}
defer w.Close()
c.walVersion, err = wal.ReadWALVersion(w)
if err != nil {
return nil, fmt.Errorf(`failed to read wal: %v`, err)
}

return c, nil
}

type migrateConfig struct {
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
force bool
}

Expand All @@ -112,7 +126,7 @@ func migrateCommandFunc(c *migrateConfig) error {
lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}
err = schema.Migrate(lg, tx, *c.targetVersion)
err = schema.Migrate(lg, tx, c.walVersion, *c.targetVersion)
if err != nil {
if !c.force {
return err
Expand Down
40 changes: 10 additions & 30 deletions server/etcdserver/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,29 @@ import (
"context"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/api/v3/version"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
)

// serverVersionAdapter implements Server interface needed by serverversion.Monitor
type serverVersionAdapter struct {
*EtcdServer
tx backend.BatchTx
}

func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter {
return &serverVersionAdapter{
EtcdServer: s,
tx: nil,
}
}

var _ serverversion.Server = (*serverVersionAdapter)(nil)

func (s *serverVersionAdapter) UpdateClusterVersion(version string) {
// TODO switch to updateClusterVersionV3 in 3.6
s.GoAttach(func() { s.updateClusterVersionV2(version) })
s.GoAttach(func() { s.updateClusterVersionV3(version) })
}

func (s *serverVersionAdapter) LinearizableReadNotify(ctx context.Context) error {
Expand Down Expand Up @@ -77,34 +72,19 @@ func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions
}

func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
if s.tx == nil {
s.Lock()
defer s.Unlock()
}
v, err := schema.UnsafeDetectSchemaVersion(s.lg, s.tx)
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx)
if err != nil {
return nil
}
return &v
}

func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) {
if s.tx == nil {
s.Lock()
defer s.Unlock()
}
err := schema.UnsafeMigrate(s.lg, s.tx, target)
if err != nil {
s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err))
}
}

func (s *serverVersionAdapter) Lock() {
s.tx = s.be.BatchTx()
s.tx.Lock()
}

func (s *serverVersionAdapter) Unlock() {
s.tx.Unlock()
s.tx = nil
func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target)
}
23 changes: 9 additions & 14 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,15 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
if c.be != nil {
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
d := &serverversion.DowngradeInfo{Enabled: false}
if c.downgradeInfo != nil {
d = &serverversion.DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
}
sv := semver.Must(semver.NewVersion(version.Version))
serverversion.MustDetectDowngrade(c.lg, sv, c.version, d)
if c.downgradeInfo != nil && c.downgradeInfo.Enabled {
c.lg.Info(
"cluster is downgrading to target version",
zap.String("target-cluster-version", c.downgradeInfo.TargetVersion),
zap.String("current-server-version", sv.String()),
)
}
serverversion.MustDetectDowngrade(c.lg, sv, c.version)
onSet(c.lg, c.version)

for _, m := range c.members {
Expand Down Expand Up @@ -548,7 +551,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
oldVer := c.version
c.version = ver
sv := semver.Must(semver.NewVersion(version.Version))
serverversion.MustDetectDowngrade(c.lg, sv, c.version, c.downgradeInfo)
serverversion.MustDetectDowngrade(c.lg, sv, c.version)
if c.v2store != nil {
mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
}
Expand Down Expand Up @@ -759,14 +762,6 @@ func (c *RaftCluster) SetDowngradeInfo(d *serverversion.DowngradeInfo, shouldApp
}

c.downgradeInfo = d

ptabor marked this conversation as resolved.
Show resolved Hide resolved
if d.Enabled {
c.lg.Info(
"The server is ready to downgrade",
zap.String("target-version", d.TargetVersion),
zap.String("server-version", version.Version),
)
}
}

// IsMemberExist returns if the member with the given id exists in cluster.
Expand Down
24 changes: 2 additions & 22 deletions server/etcdserver/version/downgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,11 @@ func isValidDowngrade(verFrom *semver.Version, verTo *semver.Version) bool {
return verTo.Equal(*allowedDowngradeVersion(verFrom))
}

// MustDetectDowngrade will detect unexpected downgrade when the local server is recovered.
func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) {
// MustDetectDowngrade will detect local server joining cluster that doesn't support it's version.
func MustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) {
// only keep major.minor version for comparison against cluster version
sv = &semver.Version{Major: sv.Major, Minor: sv.Minor}

// if the cluster enables downgrade, check local version against downgrade target version.
if d != nil && d.Enabled && d.TargetVersion != "" {
if sv.Equal(*d.GetTargetVersion()) {
if cv != nil {
lg.Info(
"cluster is downgrading to target version",
zap.String("target-cluster-version", d.TargetVersion),
zap.String("determined-cluster-version", version.Cluster(cv.String())),
zap.String("current-server-version", sv.String()),
)
}
return
}
lg.Panic(
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
zap.String("current-server-version", sv.String()),
zap.String("target-cluster-version", d.TargetVersion),
)
}

// if the cluster disables downgrade, check local version against determined cluster version.
// the validation passes when local version is not less than cluster version
if cv != nil && sv.LessThan(*cv) {
Expand Down
61 changes: 8 additions & 53 deletions server/etcdserver/version/downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,92 +29,47 @@ func TestMustDetectDowngrade(t *testing.T) {
lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
oneMinorHigher := &semver.Version{Major: lv.Major, Minor: lv.Minor + 1}
oneMinorLower := &semver.Version{Major: lv.Major, Minor: lv.Minor - 1}
downgradeEnabledHigherVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorHigher.String()}
downgradeEnabledEqualVersion := &DowngradeInfo{Enabled: true, TargetVersion: lv.String()}
downgradeEnabledLowerVersion := &DowngradeInfo{Enabled: true, TargetVersion: oneMinorLower.String()}
downgradeDisabled := &DowngradeInfo{Enabled: false}

tests := []struct {
name string
clusterVersion *semver.Version
downgrade *DowngradeInfo
success bool
message string
}{
{
"Succeeded when downgrade is disabled and cluster version is nil",
"Succeeded when cluster version is nil",
nil,
downgradeDisabled,
true,
"",
},
{
"Succeeded when downgrade is disabled and cluster version is one minor lower",
"Succeeded when cluster version is one minor lower",
oneMinorLower,
downgradeDisabled,
true,
"",
},
{
"Succeeded when downgrade is disabled and cluster version is server version",
"Succeeded when cluster version is server version",
lv,
downgradeDisabled,
true,
"",
},
{
"Failed when downgrade is disabled and server version is lower than determined cluster version ",
"Failed when server version is lower than determined cluster version ",
oneMinorHigher,
downgradeDisabled,
false,
"invalid downgrade; server version is lower than determined cluster version",
},
{
"Succeeded when downgrade is enabled and cluster version is nil",
nil,
downgradeEnabledEqualVersion,
true,
"",
},
{
"Failed when downgrade is enabled and server version is target version",
lv,
downgradeEnabledEqualVersion,
true,
"cluster is downgrading to target version",
},
{
"Succeeded when downgrade to lower version and server version is cluster version ",
lv,
downgradeEnabledLowerVersion,
false,
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
},
{
"Failed when downgrade is enabled and local version is out of range and cluster version is nil",
nil,
downgradeEnabledHigherVersion,
false,
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
},

{
"Failed when downgrade is enabled and local version is out of range",
lv,
downgradeEnabledHigherVersion,
false,
"invalid downgrade; server version is not allowed to join when downgrade is enabled",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
sv := semver.Must(semver.NewVersion(version.Version))
err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion, tt.downgrade)
err := tryMustDetectDowngrade(lg, sv, tt.clusterVersion)

if tt.success != (err == nil) {
t.Errorf("Unexpected status, got %q, wanted: %v", err, tt.success)
t.Errorf("Unexpected success, got: %v, wanted: %v", err == nil, tt.success)
// TODO test err
}
if err != nil && tt.message != fmt.Sprintf("%s", err) {
Expand All @@ -124,11 +79,11 @@ func TestMustDetectDowngrade(t *testing.T) {
}
}

func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version, d *DowngradeInfo) (err interface{}) {
func tryMustDetectDowngrade(lg *zap.Logger, sv, cv *semver.Version) (err interface{}) {
defer func() {
err = recover()
}()
MustDetectDowngrade(lg, sv, cv, d)
MustDetectDowngrade(lg, sv, cv)
return err
}

Expand Down
Loading