Skip to content

Commit

Permalink
server: Implement compaction hash checking
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Jun 15, 2022
1 parent 71bba3c commit b1a0f1e
Show file tree
Hide file tree
Showing 10 changed files with 432 additions and 58 deletions.
2 changes: 1 addition & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// newly started member ("memberInitialized==false")
// does not need corruption check
if memberInitialized && srvcfg.InitialCorruptCheck {
if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil {
if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)

Expand Down
108 changes: 98 additions & 10 deletions server/etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"io"
"net/http"
"sort"
"strings"
"sync"
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
Expand All @@ -32,10 +34,26 @@ import (
"go.uber.org/zap"
)

type corruptionMonitor struct {
type CorruptionChecker interface {
InitialCheck() error
PeriodicCheck() error
CompactHashCheck()
}

type corruptionChecker struct {
lg *zap.Logger

hasher Hasher

mux sync.RWMutex
latestRevisionChecked int64
}

func newCorruptionChecker(lg *zap.Logger, s *EtcdServer, storage mvcc.HashStorage) *corruptionChecker {
return &corruptionChecker{
lg: lg,
hasher: hasherAdapter{s, storage},
}
}

type Hasher interface {
Expand All @@ -47,13 +65,6 @@ type Hasher interface {
TriggerCorruptAlarm(uint64)
}

func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor {
return &corruptionMonitor{
lg: lg,
hasher: hasherAdapter{s, s.KV().HashStorage()},
}
}

type hasherAdapter struct {
*EtcdServer
mvcc.HashStorage
Expand All @@ -74,7 +85,7 @@ func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) {
// InitialCheck compares initial hash values with its peers
// before serving any peer/client traffic. Only mismatch when hashes
// are different at requested revision, with same compact revision.
func (cm *corruptionMonitor) InitialCheck() error {
func (cm *corruptionChecker) InitialCheck() error {

cm.lg.Info(
"starting initial corruption check",
Expand Down Expand Up @@ -153,7 +164,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
return nil
}

func (cm *corruptionMonitor) periodicCheck() error {
func (cm *corruptionChecker) PeriodicCheck() error {
h, rev, err := cm.hasher.HashByRev(0)
if err != nil {
return err
Expand Down Expand Up @@ -241,6 +252,83 @@ func (cm *corruptionMonitor) periodicCheck() error {
return nil
}

func (cm *corruptionChecker) CompactHashCheck() {
cm.lg.Info("starting compact hash check",
zap.String("local-member-id", cm.hasher.MemberId().String()),
zap.Duration("timeout", cm.hasher.ReqTimeout()),
)
hashes := cm.uncheckedRevisions()
// Assume that revisions are ordered from largest to smallest
for i, hash := range hashes {
peers := cm.hasher.PeerHashByRev(hash.Revision)
if len(peers) == 0 {
continue
}
peersChecked := 0
for _, p := range peers {
if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision {
continue
}
id := p.resp.Header.MemberId

// follower's compact revision is leader's old one, then hashes must match
if p.resp.Hash != hash.Hash {
cm.lg.Warn(
"same compact revision then hashes must match",
zap.Int64("revision", hash.Revision),
zap.Int64("leader-compact-revision", hash.CompactRevision),
zap.Uint32("leader-hash", hash.Hash),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", types.ID(id).String()),
)
cm.hasher.TriggerCorruptAlarm(id)
cm.lg.Info("compaction hash check failed")
return
}
peersChecked++
cm.lg.Info("successfully checked hash on follower",
zap.Int64("revision", hash.Revision),
zap.String("peer-id", types.ID(id).String()),
)
}
if len(peers) == peersChecked {
cm.lg.Info("successfully checked hash on whole cluster",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int64("revision", hash.Revision),
)
cm.mux.Lock()
if hash.Revision > cm.latestRevisionChecked {
cm.latestRevisionChecked = hash.Revision
}
cm.mux.Unlock()
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1))
return
}
}
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes)))
return
}

func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash {
hashes := cm.hasher.Hashes()
sort.Slice(hashes, func(i, j int) bool {
return hashes[i].Revision > hashes[j].Revision
})

cm.mux.RLock()
lastRevisionChecked := cm.latestRevisionChecked
cm.mux.RUnlock()
i := 0
for _, hash := range hashes {
if hash.Revision <= lastRevisionChecked {
return hashes[:i]
}
i++
}
return hashes[:i]
}

func (s *EtcdServer) triggerCorruptAlarm(id uint64) {
a := &pb.AlarmRequest{
MemberID: id,
Expand Down
106 changes: 101 additions & 5 deletions server/etcdserver/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestInitialCheck(t *testing.T) {
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
monitor := corruptionMonitor{
monitor := corruptionChecker{
lg: zaptest.NewLogger(t),
hasher: &tc.hasher,
}
Expand Down Expand Up @@ -205,11 +205,11 @@ func TestPeriodicCheck(t *testing.T) {
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
monitor := corruptionMonitor{
monitor := corruptionChecker{
lg: zaptest.NewLogger(t),
hasher: &tc.hasher,
}
err := monitor.periodicCheck()
err := monitor.PeriodicCheck()
if gotError := err != nil; gotError != tc.expectError {
t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError)
}
Expand All @@ -221,11 +221,101 @@ func TestPeriodicCheck(t *testing.T) {
}
}

func TestCompactHashCheck(t *testing.T) {
tcs := []struct {
name string
hasher fakeHasher
lastRevisionChecked int64

expectError bool
expectCorrupt bool
expectActions []string
expectLastRevisionChecked int64
}{
{
name: "No hashes",
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()"},
},
{
name: "No peers, check new checked from largest to smallest",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}, {Revision: 3}, {Revision: 4}},
},
lastRevisionChecked: 2,
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(4)", "PeerHashByRev(3)"},
expectLastRevisionChecked: 2,
},
{
name: "Peer error",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}},
peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
},
{
name: "Peer returned different compaction revision is skipped",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
},
{
name: "Peer returned same compaction revision but different hash triggers alarm",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 3}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"},
expectCorrupt: true,
},
{
name: "Peer returned same hash bumps last revision checked",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"},
expectLastRevisionChecked: 2,
},
{
name: "Only one peer succeeded check",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}},
peerHashes: []*peerHashKVResp{
{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}},
{err: fmt.Errorf("failed getting hash")},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
monitor := corruptionChecker{
latestRevisionChecked: tc.lastRevisionChecked,
lg: zaptest.NewLogger(t),
hasher: &tc.hasher,
}
monitor.CompactHashCheck()
if tc.hasher.alarmTriggered != tc.expectCorrupt {
t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt)
}
if tc.expectLastRevisionChecked != monitor.latestRevisionChecked {
t.Errorf("Unexpected last revision checked, got: %v, expected?: %v", monitor.latestRevisionChecked, tc.expectLastRevisionChecked)
}
assert.Equal(t, tc.expectActions, tc.hasher.actions)
})
}
}

type fakeHasher struct {
peerHashes []*peerHashKVResp
hashByRevIndex int
hashByRevResponses []hashByRev
linearizableReadNotify error
hashes []mvcc.KeyValueHash

alarmTriggered bool
actions []string
Expand All @@ -251,8 +341,14 @@ func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int6
return hashByRev.hash, hashByRev.revision, hashByRev.err
}

func (f *fakeHasher) Store(valueHash mvcc.KeyValueHash) {
panic("not implemented")
func (f *fakeHasher) Store(hash mvcc.KeyValueHash) {
f.actions = append(f.actions, fmt.Sprintf("Store(%v)", hash))
f.hashes = append(f.hashes, hash)
}

func (f *fakeHasher) Hashes() []mvcc.KeyValueHash {
f.actions = append(f.actions, "Hashes()")
return f.hashes
}

func (f *fakeHasher) ReqTimeout() time.Duration {
Expand Down
29 changes: 25 additions & 4 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ var (
// monitorVersionInterval should be smaller than the timeout
// on the connection. Or we will not be able to reuse the connection
// (since it will timeout).
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
CompactHashCheckInterval = 15 * time.Second

recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes))
storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
Expand Down Expand Up @@ -295,7 +296,8 @@ type EtcdServer struct {
*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool
forceSnapshot bool
corruptionChecker CorruptionChecker
}

// NewServer creates a new EtcdServer from the supplied configuration. The
Expand Down Expand Up @@ -371,6 +373,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())

srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))

Expand Down Expand Up @@ -530,6 +533,7 @@ func (s *EtcdServer) Start() {
s.GoAttach(s.monitorStorageVersion)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.monitorKVHash)
s.GoAttach(s.monitorCompactHash)
s.GoAttach(s.monitorDowngrade)
}

Expand Down Expand Up @@ -2199,7 +2203,6 @@ func (s *EtcdServer) monitorKVHash() {
zap.String("local-member-id", s.MemberId().String()),
zap.Duration("interval", t),
)
monitor := NewCorruptionMonitor(lg, s)
for {
select {
case <-s.stopping:
Expand All @@ -2209,12 +2212,26 @@ func (s *EtcdServer) monitorKVHash() {
if !s.isLeader() {
continue
}
if err := monitor.periodicCheck(); err != nil {
if err := s.corruptionChecker.PeriodicCheck(); err != nil {
lg.Warn("failed to check hash KV", zap.Error(err))
}
}
}

func (s *EtcdServer) monitorCompactHash() {
for {
select {
case <-time.After(CompactHashCheckInterval):
case <-s.stopping:
return
}
if !s.isLeader() {
continue
}
s.corruptionChecker.CompactHashCheck()
}
}

func (s *EtcdServer) updateClusterVersionV2(ver string) {
lg := s.Logger()

Expand Down Expand Up @@ -2408,6 +2425,10 @@ func (s *EtcdServer) Version() *serverversion.Manager {
return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s))
}

func (s *EtcdServer) CorruptionChecker() CorruptionChecker {
return s.corruptionChecker
}

func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
return func() {
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
Expand Down
Loading

0 comments on commit b1a0f1e

Please sign in to comment.