Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use v2 api to update cluster version #12988

Merged
merged 2 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- `etcd --experimental-backend-bbolt-freelist-type` has been deprecated.
- Support [downgrade API](https://github.com/etcd-io/etcd/pull/11715).
- Deprecate v2 apply on cluster version. [Use v3 request to set cluster version and recover cluster version from v3 backend](https://github.com/etcd-io/etcd/pull/11427).
- [Use v2 api to update cluster version to support mixed version cluster during upgrade](https://github.com/etcd-io/etcd/pull/12988).
- [Fix corruption bug in defrag](https://github.com/etcd-io/etcd/pull/11613).
- Fix [quorum protection logic when promoting a learner](https://github.com/etcd-io/etcd/pull/11640).
- Improve [peer corruption checker](https://github.com/etcd-io/etcd/pull/11621) to work when peer mTLS is enabled.
Expand Down
9 changes: 7 additions & 2 deletions server/etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"strconv"
"time"

"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"

Expand Down Expand Up @@ -92,9 +94,12 @@ func (a *applierV2store) Put(r *RequestV2) Response {
// return an empty response since there is no consumer.
return Response{}
}
// remove v2 version set to avoid the conflict between v2 and v3.
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
// return an empty response since there is no consumer.
if a.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, membership.ApplyBoth)
}
return Response{}
}
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
Expand Down
51 changes: 46 additions & 5 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2428,6 +2428,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
// It updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the
// local version.
// TODO switch to updateClusterVersionV3 in 3.6
func (s *EtcdServer) monitorVersions() {
for {
select {
Expand Down Expand Up @@ -2458,27 +2459,67 @@ func (s *EtcdServer) monitorVersions() {
if v != nil {
verStr = v.String()
}
s.GoAttach(func() { s.updateClusterVersion(verStr) })
s.GoAttach(func() { s.updateClusterVersionV2(verStr) })
continue
}

if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
s.GoAttach(func() { s.updateClusterVersion(v.String()) })
s.GoAttach(func() { s.updateClusterVersionV2(v.String()) })
}
}
}

func (s *EtcdServer) updateClusterVersion(ver string) {
func (s *EtcdServer) updateClusterVersionV2(ver string) {
lg := s.Logger()

if s.cluster.Version() == nil {
lg.Info(
"setting up initial cluster version",
"setting up initial cluster version using v2 API",
zap.String("cluster-version", version.Cluster(ver)),
)
} else {
lg.Info(
"updating cluster version",
"updating cluster version using v2 API",
zap.String("from", version.Cluster(s.cluster.Version().String())),
zap.String("to", version.Cluster(ver)),
)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well rename this method to updateClusterVersionV2, to be clear?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also update the logging

"updating cluster version using v2 API

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"setting up initial cluster version using v2 API"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in the next revision as your comments. PTAL @gyuho

req := pb.Request{
Method: "PUT",
Path: membership.StoreClusterVersionKey(),
Val: ver,
}

ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
_, err := s.Do(ctx, req)
cancel()

switch err {
case nil:
lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
return

case ErrStopped:
lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
return

default:
lg.Warn("failed to update cluster version", zap.Error(err))
}
}

func (s *EtcdServer) updateClusterVersionV3(ver string) {
lg := s.Logger()

if s.cluster.Version() == nil {
lg.Info(
"setting up initial cluster version using v3 API",
zap.String("cluster-version", version.Cluster(ver)),
)
} else {
lg.Info(
"updating cluster version using v3 API",
zap.String("from", version.Cluster(s.cluster.Version().String())),
zap.String("to", version.Cluster(ver)),
)
Expand Down
48 changes: 48 additions & 0 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
"net/http"
"os"
"path"
"path/filepath"
"reflect"
"sync"
Expand Down Expand Up @@ -1716,6 +1717,53 @@ func TestPublishV3Retry(t *testing.T) {
<-ch
}

func TestUpdateVersion(t *testing.T) {
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 1,
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &membership.RaftCluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},

ctx: ctx,
cancel: cancel,
}
srv.updateClusterVersionV2("2.0.0")

action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.Request
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method)
}
if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
t.Errorf("path = %s, want %s", r.Path, wpath)
}
if r.Val != "2.0.0" {
t.Errorf("val = %s, want %s", r.Val, "2.0.0")
}
}

func TestStopNotify(t *testing.T) {
s := &EtcdServer{
lgMu: new(sync.RWMutex),
Expand Down