Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: Implement compaction hash checking #14120

Merged
merged 6 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ type ServerConfig struct {

// InitialCorruptCheck is true to check data corruption on boot
// before serving any peer/client traffic.
InitialCorruptCheck bool
CorruptCheckTime time.Duration
InitialCorruptCheck bool
CorruptCheckTime time.Duration
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration

// PreVote is true to enable Raft Pre-Vote.
PreVote bool
Expand Down
14 changes: 12 additions & 2 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,11 @@ type Config struct {
// AuthTokenTTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"`
ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"`

// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
Expand Down Expand Up @@ -521,6 +524,9 @@ func NewConfig() *Config {
ExperimentalTxnModeWriteWithSharedBuffer: true,
ExperimentalMaxLearners: membership.DefaultMaxLearners,

ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute,

V2Deprecation: config.V2_DEPR_DEFAULT,

DiscoveryCfg: v3discovery.DiscoveryConfig{
Expand Down Expand Up @@ -759,6 +765,10 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}

if cfg.ExperimentalCompactHashCheckTime <= 0 {
serathius marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime)
}

return nil
}

Expand Down
6 changes: 5 additions & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
Expand Down Expand Up @@ -252,7 +254,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// newly started member ("memberInitialized==false")
// does not need corruption check
if memberInitialized && srvcfg.InitialCorruptCheck {
if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil {
if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)

Expand Down Expand Up @@ -344,6 +346,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.Bool("pre-vote", sc.PreVote),
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled),
zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime),
zap.String("auto-compaction-mode", sc.AutoCompactionMode),
zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func newConfig() *config {
// experimental
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.")
fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hashes.")

fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
Expand Down
101 changes: 95 additions & 6 deletions server/etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"io"
"net/http"
"sort"
"strings"
"sync"
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
Expand All @@ -32,10 +34,19 @@ import (
"go.uber.org/zap"
)

type corruptionMonitor struct {
type CorruptionChecker interface {
InitialCheck() error
PeriodicCheck() error
CompactHashCheck()
}

type corruptionChecker struct {
lg *zap.Logger

hasher Hasher

mux sync.RWMutex
latestRevisionChecked int64
}

type Hasher interface {
Expand All @@ -47,10 +58,10 @@ type Hasher interface {
TriggerCorruptAlarm(uint64)
}

func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor {
return &corruptionMonitor{
func newCorruptionChecker(lg *zap.Logger, s *EtcdServer, storage mvcc.HashStorage) *corruptionChecker {
return &corruptionChecker{
lg: lg,
hasher: hasherAdapter{s, s.KV().HashStorage()},
hasher: hasherAdapter{s, storage},
}
}

Expand All @@ -74,7 +85,7 @@ func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) {
// InitialCheck compares initial hash values with its peers
// before serving any peer/client traffic. Only mismatch when hashes
// are different at requested revision, with same compact revision.
func (cm *corruptionMonitor) InitialCheck() error {
func (cm *corruptionChecker) InitialCheck() error {

cm.lg.Info(
"starting initial corruption check",
Expand Down Expand Up @@ -153,7 +164,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
return nil
}

func (cm *corruptionMonitor) periodicCheck() error {
func (cm *corruptionChecker) PeriodicCheck() error {
h, rev, err := cm.hasher.HashByRev(0)
if err != nil {
return err
Expand Down Expand Up @@ -241,6 +252,84 @@ func (cm *corruptionMonitor) periodicCheck() error {
return nil
}

func (cm *corruptionChecker) CompactHashCheck() {
cm.lg.Info("starting compact hash check",
zap.String("local-member-id", cm.hasher.MemberId().String()),
zap.Duration("timeout", cm.hasher.ReqTimeout()),
)
hashes := cm.uncheckedRevisions()
// Assume that revisions are ordered from largest to smallest
for i, hash := range hashes {
peers := cm.hasher.PeerHashByRev(hash.Revision)
if len(peers) == 0 {
continue
}
peersChecked := 0
for _, p := range peers {
if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision {
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// follower's compact revision is leader's old one, then hashes must match
if p.resp.Hash != hash.Hash {
cm.hasher.TriggerCorruptAlarm(uint64(p.id))
cm.lg.Error("failed compaction hash check",
zap.Int64("revision", hash.Revision),
zap.Int64("leader-compact-revision", hash.CompactRevision),
zap.Uint32("leader-hash", hash.Hash),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", p.id.String()),
)
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest not to return here; instead, we should continue to compare other peers' hash, and generate multiple alrams if needed.

Copy link
Member Author

@serathius serathius Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you are proposing is not consistent with periodic check which only raises one alarm even though there could be multiple members corrupted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we at least finish comparing all peers' hashes and generate a info or error log? I think it's helpful for 3.5, at least we can tell the hashes from log.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will heavily complicate implementation. There is nothing in those logs that users cannot get themselves.

Copy link
Member

@ahrtr ahrtr Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine a situation for 3.5, when the compact hash checker raises an alarm CORRUPT, then users must figure out manually which member's data is corrupted. If all member's revisions are exactly the same, and the corruption is just caused by some disk I/O bit flip or disk corruption. Then users must get all members' log, and find out all the log entries below, and compare the hashes themselves.

{"level":"info","ts":"2022-07-26T15:26:01.386+0800","caller":"mvcc/kvstore_compaction.go:65","msg":"finished scheduled compaction","compact-revision":9,"took":"40.078µs","hash":1087596250}

If we finish comparing all peers' hashes here and print a info or error log, then the users can easily figure out which member's data is corrupted.

Actually It just a minor code change, and I don't think it complicate the implementation.

For 3.6, we can do some additional enhancement, but for 3.5 the users can only use the log to do analysis themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, we can do it in a separate PR.

}
peersChecked++
cm.lg.Info("successfully checked hash on follower",
zap.Int64("revision", hash.Revision),
zap.String("peer-id", p.id.String()),
)
}
if len(peers) == peersChecked {
cm.lg.Info("successfully checked hash on whole cluster",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int64("revision", hash.Revision),
)
cm.mux.Lock()
if hash.Revision > cm.latestRevisionChecked {
cm.latestRevisionChecked = hash.Revision
}
cm.mux.Unlock()
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1))
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the hashes entries are ordered in descending order, so you are intentionally return here if the hash checking on the latest revision is successful?

Is it possible that the latest [compactionRev, rev] hashes match, but previous ones do NOT match?

Copy link
Member Author

@serathius serathius Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it could as a corruption can technically self healed. Does it mean etcd is corrupted? In my opinion no.

For example imagine that there PUT requests that was not reflected on all members. It means that hashes between members will not match. However, sometime later comes a DELETE request on the same key and it is reflected. That would restore the hash between members and invalidate previous corruption. If we look through cached hashes, the latest one will match, even though the old ones don't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to intentionally create such situation in which previous hashes do not match and the latest hashes match, but one member's data is indeed corrupted, but eventually I failed to create such situation. The root cause is etcd scans the whole KEY space starting from 0x00 to the specified compactMainRev; Specifically, the last isn't set to the prevCompactRev.

So in summary, the case that the latest [compactionRev, rev] hashes match, but previous ones do NOT match will never happen. And I think it's more safe to scan the whole KEY space instead of starting from prevCompactRev.

}
serathius marked this conversation as resolved.
Show resolved Hide resolved
cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int("number-of-peers", len(peers)),
zap.Int64("revision", hash.Revision),
Copy link
Member

@ahrtr ahrtr Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to print the member IDs which are or aren't checked in this loop?

Copy link
Member Author

@serathius serathius Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a log above that list all peers that were successfully checked. Writing both set of checked and it's complement is overkill

)
}
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes)))
return
}

func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash {
cm.mux.RLock()
lastRevisionChecked := cm.latestRevisionChecked
cm.mux.RUnlock()

hashes := cm.hasher.Hashes()
// Sort in descending order
sort.Slice(hashes, func(i, j int) bool {
return hashes[i].Revision > hashes[j].Revision
})
for i, hash := range hashes {
if hash.Revision <= lastRevisionChecked {
return hashes[:i]
}
}
return hashes
}

func (s *EtcdServer) triggerCorruptAlarm(id uint64) {
a := &pb.AlarmRequest{
MemberID: id,
Expand Down
106 changes: 101 additions & 5 deletions server/etcdserver/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestInitialCheck(t *testing.T) {
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
monitor := corruptionMonitor{
monitor := corruptionChecker{
lg: zaptest.NewLogger(t),
hasher: &tc.hasher,
}
Expand Down Expand Up @@ -205,11 +205,11 @@ func TestPeriodicCheck(t *testing.T) {
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
monitor := corruptionMonitor{
monitor := corruptionChecker{
lg: zaptest.NewLogger(t),
hasher: &tc.hasher,
}
err := monitor.periodicCheck()
err := monitor.PeriodicCheck()
if gotError := err != nil; gotError != tc.expectError {
t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError)
}
Expand All @@ -221,11 +221,101 @@ func TestPeriodicCheck(t *testing.T) {
}
}

func TestCompactHashCheck(t *testing.T) {
tcs := []struct {
name string
hasher fakeHasher
lastRevisionChecked int64

expectError bool
expectCorrupt bool
expectActions []string
expectLastRevisionChecked int64
}{
{
name: "No hashes",
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()"},
},
{
name: "No peers, check new checked from largest to smallest",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}, {Revision: 3}, {Revision: 4}},
},
lastRevisionChecked: 2,
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(4)", "PeerHashByRev(3)"},
expectLastRevisionChecked: 2,
},
{
name: "Peer error",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}},
peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
},
{
name: "Peer returned different compaction revision is skipped",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
},
{
name: "Peer returned same compaction revision but different hash triggers alarm",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"},
expectCorrupt: true,
},
{
name: "Peer returned same hash bumps last revision checked",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"},
expectLastRevisionChecked: 2,
},
{
name: "Only one peer succeeded check",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}},
peerHashes: []*peerHashKVResp{
{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}},
{err: fmt.Errorf("failed getting hash")},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
monitor := corruptionChecker{
latestRevisionChecked: tc.lastRevisionChecked,
lg: zaptest.NewLogger(t),
hasher: &tc.hasher,
}
monitor.CompactHashCheck()
if tc.hasher.alarmTriggered != tc.expectCorrupt {
t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt)
}
if tc.expectLastRevisionChecked != monitor.latestRevisionChecked {
t.Errorf("Unexpected last revision checked, got: %v, expected?: %v", monitor.latestRevisionChecked, tc.expectLastRevisionChecked)
}
assert.Equal(t, tc.expectActions, tc.hasher.actions)
})
}
}

type fakeHasher struct {
peerHashes []*peerHashKVResp
hashByRevIndex int
hashByRevResponses []hashByRev
linearizableReadNotify error
hashes []mvcc.KeyValueHash

alarmTriggered bool
actions []string
Expand All @@ -251,8 +341,14 @@ func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int6
return hashByRev.hash, hashByRev.revision, hashByRev.err
}

func (f *fakeHasher) Store(valueHash mvcc.KeyValueHash) {
panic("not implemented")
func (f *fakeHasher) Store(hash mvcc.KeyValueHash) {
f.actions = append(f.actions, fmt.Sprintf("Store(%v)", hash))
f.hashes = append(f.hashes, hash)
}

func (f *fakeHasher) Hashes() []mvcc.KeyValueHash {
f.actions = append(f.actions, "Hashes()")
return f.hashes
}

func (f *fakeHasher) ReqTimeout() time.Duration {
Expand Down
Loading