Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

20210429 etcdctl v2 backup cindex fix #12906

Merged
merged 5 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions client/pkg/testutil/leak.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ func BeforeTest(t TB) {
// It will detect common goroutine leaks, retrying in case there are goroutines
// not synchronously torn down, and fail the test if any goroutines are stuck.
func AfterTest(t TB) {
if err := CheckAfterTest(1 * time.Second); err != nil {
t.Errorf("Test %v", err)
// If test-failed the leaked goroutines list is hidding the real
// source of problem.
if !t.Failed() {
if err := CheckAfterTest(1 * time.Second); err != nil {
t.Errorf("Test %v", err)
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion client/pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func MustNewURL(t *testing.T, s string) *url.URL {
func FatalStack(t *testing.T, s string) {
stackTrace := make([]byte, 1024*1024)
n := runtime.Stack(stackTrace, true)
t.Errorf("---> Test failed: %s", s)
t.Error(string(stackTrace[:n]))
t.Fatalf(s)
t.Fatal(s)
}

// ConditionFunc returns true when a condition is met.
Expand Down
111 changes: 66 additions & 45 deletions etcdctl/ctlv2/command/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package command

import (
"log"
"os"
"path"
"regexp"
Expand All @@ -33,6 +32,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/verify"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"

Expand Down Expand Up @@ -90,7 +90,10 @@ func handleBackup(c *cli.Context) error {
var srcWAL string
var destWAL string

lg := zap.NewExample()
lg, err := zap.NewProduction()
if err != nil {
return err
}

withV3 := c.Bool("with-v3")
srcDir := c.String("data-dir")
Expand All @@ -112,44 +115,51 @@ func handleBackup(c *cli.Context) error {
}

if err := fileutil.CreateDirAll(destSnap); err != nil {
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
lg.Fatal("failed creating backup snapshot dir", zap.String("dest-snap", destSnap), zap.Error(err))
}

destDbPath := datadir.ToBackendFileName(destDir)
srcDbPath := datadir.ToBackendFileName(srcDir)
desired := newDesiredCluster()

walsnap := saveSnap(lg, destSnap, srcSnap, &desired)
metadata, state, ents := loadWAL(srcWAL, walsnap, withV3)
destDbPath := datadir.ToBackendFileName(destDir)
saveDB(lg, destDbPath, datadir.ToBackendFileName(srcDir), state.Commit, &desired, withV3)
metadata, state, ents := translateWAL(lg, srcWAL, walsnap, withV3)
saveDB(lg, destDbPath, srcDbPath, state.Commit, &desired, withV3)

neww, err := wal.Create(zap.NewExample(), destWAL, pbutil.MustMarshal(&metadata))
neww, err := wal.Create(lg, destWAL, pbutil.MustMarshal(&metadata))
if err != nil {
log.Fatal(err)
lg.Fatal("wal.Create failed", zap.Error(err))
}
defer neww.Close()
if err := neww.Save(state, ents); err != nil {
log.Fatal(err)
lg.Fatal("wal.Save failed ", zap.Error(err))
}
if err := neww.SaveSnapshot(walsnap); err != nil {
log.Fatal(err)
lg.Fatal("SaveSnapshot", zap.Error(err))
}

verify.MustVerifyIfEnabled(verify.Config{
Logger: lg,
DataDir: destDir,
ExactIndex: false,
})

return nil
}

func saveSnap(lg *zap.Logger, destSnap, srcSnap string, desired *desiredCluster) (walsnap walpb.Snapshot) {
ss := snap.New(lg, srcSnap)
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
log.Fatal(err)
lg.Fatal("saveSnap(Snapshoter.Load) failed", zap.Error(err))
}
if snapshot != nil {
walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &desired.confState
newss := snap.New(lg, destSnap)
snapshot.Metadata.ConfState = desired.confState
snapshot.Data = mustTranslateV2store(lg, snapshot.Data, desired)
if err = newss.SaveSnap(*snapshot); err != nil {
log.Fatal(err)
lg.Fatal("saveSnap(Snapshoter.SaveSnap) failed", zap.Error(err))
}
}
return walsnap
Expand All @@ -175,37 +185,36 @@ func mustTranslateV2store(lg *zap.Logger, storeData []byte, desired *desiredClus
return outputData
}

func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) {
w, err := wal.OpenForRead(zap.NewExample(), srcWAL, walsnap)
func translateWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) {
w, err := wal.OpenForRead(lg, srcWAL, walsnap)
if err != nil {
log.Fatal(err)
lg.Fatal("wal.OpenForRead failed", zap.Error(err))
}
defer w.Close()
wmetadata, state, ents, err := w.ReadAll()
switch err {
case nil:
case wal.ErrSnapshotNotFound:
log.Printf("Failed to find the match snapshot record %+v in wal %v.", walsnap, srcWAL)
log.Printf("etcdctl will add it back. Start auto fixing...")
lg.Warn("failed to find the match snapshot record", zap.Any("walsnap", walsnap), zap.String("srcWAL", srcWAL))
lg.Warn("etcdctl will add it back. Start auto fixing...")
default:
log.Fatal(err)
lg.Fatal("unexpected error while reading WAL", zap.Error(err))
}

re := path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")
memberAttrRE := regexp.MustCompile(re)

removed := uint64(0)
i := 0
remove := func() {
ents = append(ents[:i], ents[i+1:]...)
removed++
i--
}
for i = 0; i < len(ents); i++ {
ents[i].Index -= removed
for i := 0; i < len(ents); i++ {

// Replacing WAL entries with 'dummy' entries allows to avoid
// complicated entries shifting and risk of other data (like consistent_index)
// running out of sync.
// Also moving entries and computing offsets would get complicated if
// TERM changes (so there are superflous entries from previous term).

if ents[i].Type == raftpb.EntryConfChange {
log.Println("ignoring EntryConfChange raft entry")
remove()
lg.Info("ignoring EntryConfChange raft entry")
raftEntryToNoOp(&ents[i])
continue
}

Expand All @@ -219,33 +228,42 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad
}

if v2Req != nil && v2Req.Method == "PUT" && memberAttrRE.MatchString(v2Req.Path) {
log.Println("ignoring member attribute update on", v2Req.Path)
remove()
lg.Info("ignoring member attribute update on",
zap.Stringer("entry", &ents[i]),
zap.String("v2Req.Path", v2Req.Path))
raftEntryToNoOp(&ents[i])
continue
}

if v2Req != nil {
continue
lg.Debug("preserving log entry", zap.Stringer("entry", &ents[i]))
}

if raftReq.ClusterMemberAttrSet != nil {
log.Println("ignoring cluster_member_attr_set")
remove()
lg.Info("ignoring cluster_member_attr_set")
raftEntryToNoOp(&ents[i])
continue
}

if v3 || raftReq.Header == nil {
lg.Debug("preserving log entry", zap.Stringer("entry", &ents[i]))
continue
}
log.Println("ignoring v3 raft entry")
remove()
lg.Info("ignoring v3 raft entry")
raftEntryToNoOp(&ents[i])
}
state.Commit -= removed
var metadata etcdserverpb.Metadata
pbutil.MustUnmarshal(&metadata, wmetadata)
return metadata, state, ents
}

func raftEntryToNoOp(entry *raftpb.Entry) {
// Empty (dummy) entries are send by RAFT when new leader is getting elected.
// They do not cary any change to data-model so its safe to replace entries
// to be ignored with them.
*entry = raftpb.Entry{Term: entry.Term, Index: entry.Index, Type: raftpb.EntryNormal, Data: nil}
}

// saveDB copies the v3 backend and strips cluster information.
func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) {

Expand All @@ -256,42 +274,42 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl
go func() {
db, err := bolt.Open(srcDB, 0444, &bolt.Options{ReadOnly: true})
if err != nil {
log.Fatal(err)
lg.Fatal("bolt.Open FAILED", zap.Error(err))
}
ch <- db
}()
select {
case src = <-ch:
case <-time.After(time.Second):
log.Println("waiting to acquire lock on", srcDB)
lg.Fatal("timed out waiting to acquire lock on", zap.String("srcDB", srcDB))
src = <-ch
}
defer src.Close()

tx, err := src.Begin(false)
if err != nil {
log.Fatal(err)
lg.Fatal("bbolt.BeginTx failed", zap.Error(err))
}

// copy srcDB to destDB
dest, err := os.Create(destDB)
if err != nil {
log.Fatal(err)
lg.Fatal("creation of destination file failed", zap.String("dest", destDB), zap.Error(err))
}
if _, err := tx.WriteTo(dest); err != nil {
log.Fatal(err)
lg.Fatal("bbolt write to destination file failed", zap.String("dest", destDB), zap.Error(err))
}
dest.Close()
if err := tx.Rollback(); err != nil {
log.Fatal(err)
lg.Fatal("bbolt tx.Rollback failed", zap.String("dest", destDB), zap.Error(err))
}
}

be := backend.NewDefaultBackend(destDB)
defer be.Close()

if err := membership.TrimClusterFromBackend(be); err != nil {
log.Fatal(err)
lg.Fatal("bbolt tx.Membership failed", zap.Error(err))
}

raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members)
Expand All @@ -303,10 +321,13 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket([]byte("meta"))
cindex.UnsafeCreateMetaBucket(tx)
ci := cindex.NewConsistentIndex(tx)
ci.SetConsistentIndex(idx)
ci.UnsafeSave(tx)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.
}

}
3 changes: 2 additions & 1 deletion etcdctl/ctlv3/command/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/wal"
Expand Down Expand Up @@ -91,7 +92,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
}()

readKeys(reader, be)
mvcc.UpdateConsistentIndex(be, index)
cindex.UpdateConsistentIndex(be.BatchTx(), index)
err := <-errc
if err != nil {
fmt.Println("failed to transform keys")
Expand Down
13 changes: 12 additions & 1 deletion pkg/expect/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func (ep *ExpectProcess) Signal(sig os.Signal) error {
}

// Close waits for the expect process to exit.
// Close currently does not return error if process exited with !=0 status.
// TODO: Close should expose underlying proces failure by default.
func (ep *ExpectProcess) Close() error { return ep.close(false) }

func (ep *ExpectProcess) close(kill bool) error {
Expand All @@ -162,14 +164,14 @@ func (ep *ExpectProcess) close(kill bool) error {
ep.wg.Wait()

if err != nil {
ep.err = err
if !kill && strings.Contains(err.Error(), "exit status") {
// non-zero exit code
err = nil
} else if kill && strings.Contains(err.Error(), "signal:") {
err = nil
}
}

ep.cmd = nil
return err
}
Expand All @@ -178,3 +180,12 @@ func (ep *ExpectProcess) Send(command string) error {
_, err := io.WriteString(ep.fpty, command)
return err
}

func (ep *ExpectProcess) ProcessError() error {
if strings.Contains(ep.err.Error(), "input/output error") {
// TODO: The expect library should not return
// `/dev/ptmx: input/output error` when process just exits.
return nil
}
return ep.err
}
3 changes: 3 additions & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if err = e.Server.CheckInitialHashKV(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)

e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
e.Server.Cleanup()
e.Server = nil
return e, err
}
Expand Down
Loading