Skip to content

Commit

Permalink
Merge 1ea53d5 into a5b9f72
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius authored Apr 7, 2022
2 parents a5b9f72 + 1ea53d5 commit 12504df
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 34 deletions.
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeCreateMetaBucket(tx)
schema.UnsafeUpdateConsistentIndex(tx, idx, term, false)
schema.UnsafeUpdateConsistentIndex(tx, idx, term)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.
Expand Down
2 changes: 1 addition & 1 deletion etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
defer be.Close()

cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
return nil
}
4 changes: 2 additions & 2 deletions server/etcdserver/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotI
// create snapshot db file: "%016x.snap.db"
be := serverstorage.OpenBackend(cfg, nil)
schema.CreateMetaBucket(be.BatchTx())
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm, false)
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm)
schema.MustUnsafeSaveConfStateToBackend(cfg.Logger, be.BatchTx(), &confState)
if err = be.Close(); err != nil {
return
Expand All @@ -301,6 +301,6 @@ func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotI
// create backend db file
be = serverstorage.OpenBackend(cfg, nil)
schema.CreateMetaBucket(be.BatchTx())
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1, false)
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1)
return be.Close()
}
7 changes: 3 additions & 4 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

type Backend interface {
ReadTx() backend.ReadTx
BatchTx() backend.BatchTx
}

// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
Expand Down Expand Up @@ -119,7 +118,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
term := atomic.LoadUint64(&ci.term)
schema.UnsafeUpdateConsistentIndex(tx, index, term, true)
schema.UnsafeUpdateConsistentIndex(tx, index, term)
}

func (ci *consistentIndex) SetBackend(be Backend) {
Expand Down Expand Up @@ -170,8 +169,8 @@ func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}

func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
tx.LockOutsideApply()
defer tx.Unlock()
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
schema.UnsafeUpdateConsistentIndex(tx, index, term)
}
52 changes: 52 additions & 0 deletions server/etcdserver/cindex/cindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,58 @@ func TestConsistentIndex(t *testing.T) {
assert.Equal(t, r, index)
}

func TestConsistentIndexDecrease(t *testing.T) {
initIndex := uint64(100)
initTerm := uint64(10)

tcs := []struct {
name string
index uint64
term uint64
}{
{
name: "Decrease term",
index: initIndex + 1,
term: initTerm - 1,
},
{
name: "Decrease CI",
index: initIndex - 1,
term: initTerm + 1,
},
{
name: "Decrease CI and term",
index: initIndex - 1,
term: initTerm - 1,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
tx.Lock()
schema.UnsafeCreateMetaBucket(tx)
schema.UnsafeUpdateConsistentIndex(tx, initIndex, initTerm)
tx.Unlock()
be.ForceCommit()
be.Close()

be = backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath)
defer be.Close()
ci := NewConsistentIndex(be)
ci.SetConsistentIndex(tc.index, tc.term)
tx = be.BatchTx()
tx.Lock()
ci.UnsafeSave(tx)
tx.Unlock()
assert.Equal(t, tc.index, ci.ConsistentIndex())

ci = NewConsistentIndex(be)
assert.Equal(t, tc.index, ci.ConsistentIndex())
})
}
}

func TestFakeConsistentIndex(t *testing.T) {

r := rand.Uint64()
Expand Down
24 changes: 2 additions & 22 deletions server/storage/schema/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package schema

import (
"encoding/binary"

"go.etcd.io/etcd/server/v3/storage/backend"
)

Expand Down Expand Up @@ -56,32 +57,11 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
return UnsafeReadConsistentIndex(tx)
}

func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
if index == 0 {
// Never save 0 as it means that we didn't load the real index yet.
return
}

if onlyGrow {
oldi, oldTerm := UnsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if index > oldi {
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
}
if term > 0 && term > oldTerm {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(Meta, MetaTermKeyName, bs2)
}
return
}

bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
Expand Down
8 changes: 4 additions & 4 deletions server/storage/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestValidate(t *testing.T) {
version: V3_5,
overrideKeys: func(tx backend.BatchTx) {
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeUpdateConsistentIndex(tx, 1, 1)
},
},
{
Expand Down Expand Up @@ -313,14 +313,14 @@ func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx
case V3_4:
case V3_5:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeUpdateConsistentIndex(tx, 1, 1)
case V3_6:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeUpdateConsistentIndex(tx, 1, 1)
UnsafeSetStorageVersion(tx, &V3_6)
case V3_7:
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
UnsafeUpdateConsistentIndex(tx, 1, 1)
UnsafeSetStorageVersion(tx, &V3_7)
tx.UnsafePut(Meta, []byte("future-key"), []byte(""))
default:
Expand Down

0 comments on commit 12504df

Please sign in to comment.