Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-3.5: backport set version panic fix, ARM64 tests #12990

Merged
merged 2 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
name: Linux ARM64 Graviton2

on:
push:
branches: [main]
pull_request:
branches: [main]
on: [push, pull_request]

jobs:
test:
Expand Down
9 changes: 7 additions & 2 deletions server/etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"strconv"
"time"

"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"

Expand Down Expand Up @@ -92,9 +94,12 @@ func (a *applierV2store) Put(r *RequestV2) Response {
// return an empty response since there is no consumer.
return Response{}
}
// remove v2 version set to avoid the conflict between v2 and v3.
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
// return an empty response since there is no consumer.
if a.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, membership.ApplyBoth)
}
return Response{}
}
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
Expand Down
51 changes: 46 additions & 5 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2428,6 +2428,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
// It updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the
// local version.
// TODO switch to updateClusterVersionV3 in 3.6
func (s *EtcdServer) monitorVersions() {
for {
select {
Expand Down Expand Up @@ -2458,27 +2459,67 @@ func (s *EtcdServer) monitorVersions() {
if v != nil {
verStr = v.String()
}
s.GoAttach(func() { s.updateClusterVersion(verStr) })
s.GoAttach(func() { s.updateClusterVersionV2(verStr) })
continue
}

if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
s.GoAttach(func() { s.updateClusterVersion(v.String()) })
s.GoAttach(func() { s.updateClusterVersionV2(v.String()) })
}
}
}

func (s *EtcdServer) updateClusterVersion(ver string) {
func (s *EtcdServer) updateClusterVersionV2(ver string) {
lg := s.Logger()

if s.cluster.Version() == nil {
lg.Info(
"setting up initial cluster version",
"setting up initial cluster version using v2 API",
zap.String("cluster-version", version.Cluster(ver)),
)
} else {
lg.Info(
"updating cluster version",
"updating cluster version using v2 API",
zap.String("from", version.Cluster(s.cluster.Version().String())),
zap.String("to", version.Cluster(ver)),
)
}

req := pb.Request{
Method: "PUT",
Path: membership.StoreClusterVersionKey(),
Val: ver,
}

ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
_, err := s.Do(ctx, req)
cancel()

switch err {
case nil:
lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
return

case ErrStopped:
lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
return

default:
lg.Warn("failed to update cluster version", zap.Error(err))
}
}

func (s *EtcdServer) updateClusterVersionV3(ver string) {
lg := s.Logger()

if s.cluster.Version() == nil {
lg.Info(
"setting up initial cluster version using v3 API",
zap.String("cluster-version", version.Cluster(ver)),
)
} else {
lg.Info(
"updating cluster version using v3 API",
zap.String("from", version.Cluster(s.cluster.Version().String())),
zap.String("to", version.Cluster(ver)),
)
Expand Down
48 changes: 48 additions & 0 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
"net/http"
"os"
"path"
"path/filepath"
"reflect"
"sync"
Expand Down Expand Up @@ -1716,6 +1717,53 @@ func TestPublishV3Retry(t *testing.T) {
<-ch
}

func TestUpdateVersion(t *testing.T) {
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 1,
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
cluster: &membership.RaftCluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},

ctx: ctx,
cancel: cancel,
}
srv.updateClusterVersionV2("2.0.0")

action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.Request
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method)
}
if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
t.Errorf("path = %s, want %s", r.Path, wpath)
}
if r.Val != "2.0.0" {
t.Errorf("val = %s, want %s", r.Val, "2.0.0")
}
}

func TestStopNotify(t *testing.T) {
s := &EtcdServer{
lgMu: new(sync.RWMutex),
Expand Down