diff --git a/server/config/config.go b/server/config/config.go index 9ecfc146336..5206b3dc5f9 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -147,8 +147,10 @@ type ServerConfig struct { // InitialCorruptCheck is true to check data corruption on boot // before serving any peer/client traffic. - InitialCorruptCheck bool - CorruptCheckTime time.Duration + InitialCorruptCheck bool + CorruptCheckTime time.Duration + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration // PreVote is true to enable Raft Pre-Vote. PreVote bool diff --git a/server/embed/config.go b/server/embed/config.go index 4e1f6a19c07..af4e2524182 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -320,8 +320,11 @@ type Config struct { // AuthTokenTTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` - ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` - ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` + ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` + ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"` + ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"` + // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` // ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. @@ -521,6 +524,9 @@ func NewConfig() *Config { ExperimentalTxnModeWriteWithSharedBuffer: true, ExperimentalMaxLearners: membership.DefaultMaxLearners, + ExperimentalCompactHashCheckEnabled: false, + ExperimentalCompactHashCheckTime: time.Minute, + V2Deprecation: config.V2_DEPR_DEFAULT, DiscoveryCfg: v3discovery.DiscoveryConfig{ @@ -759,6 +765,10 @@ func (cfg *Config) Validate() error { return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint") } + if cfg.ExperimentalCompactHashCheckTime <= 0 { + return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime) + } + return nil } diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 6332917a7ad..3d94b63be1e 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -202,6 +202,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { HostWhitelist: cfg.HostWhitelist, InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, + CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled, + CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime, PreVote: cfg.PreVote, Logger: cfg.logger, ForceNewCluster: cfg.ForceNewCluster, @@ -344,6 +346,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), + zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled), + zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime), zap.String("auto-compaction-mode", sc.AutoCompactionMode), zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention), zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()), diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 28f81e33eba..b14191a95cb 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -259,6 +259,8 @@ func newConfig() *config { // experimental fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") + fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.") + fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hashes.") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.") // TODO: delete in v3.7 diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 818d902ed7a..8ccde507c2c 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -301,13 +301,12 @@ func (cm *corruptionChecker) CompactHashCheck() { cm.mux.Unlock() cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) return - } else { - cm.lg.Warn("skipped checking hash; was not able to check all peers", - zap.Int("number-of-peers-checked", peersChecked), - zap.Int("number-of-peers", len(peers)), - zap.Int64("revision", hash.Revision), - ) } + cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int("number-of-peers", len(peers)), + zap.Int64("revision", hash.Revision), + ) } cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) return diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 40d9b7ea775..99a2159d993 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -111,8 +111,7 @@ var ( // monitorVersionInterval should be smaller than the timeout // on the connection. Or we will not be able to reuse the connection // (since it will timeout). - monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second - CompactHashCheckInterval = 15 * time.Second + monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes)) storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")) @@ -2219,9 +2218,13 @@ func (s *EtcdServer) monitorKVHash() { } func (s *EtcdServer) monitorCompactHash() { + if !s.Cfg.CompactHashCheckEnabled { + return + } + t := s.Cfg.CompactHashCheckTime for { select { - case <-time.After(CompactHashCheckInterval): + case <-time.After(t): case <-s.stopping: return } diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index 247dbd7f8a4..ae8c32350cd 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/storage/datadir" "go.etcd.io/etcd/server/v3/storage/mvcc/testutil" "go.etcd.io/etcd/tests/v3/framework/config" @@ -136,10 +135,13 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) { } func TestCompactHashCheckDetectCorruption(t *testing.T) { + checkTime := time.Second e2e.BeforeTest(t) epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ - ClusterSize: 3, - KeepDataDir: true, + ClusterSize: 3, + KeepDataDir: true, + CompactHashCheckEnabled: true, + CompactHashCheckTime: checkTime, }) if err != nil { t.Fatalf("could not start etcd process cluster (%v)", err) @@ -173,7 +175,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { assert.NoError(t, err) _, err = cc.Compact(5, config.CompactOption{}) assert.NoError(t, err) - time.Sleep(etcdserver.CompactHashCheckInterval * 11 / 10) + time.Sleep(checkTime * 11 / 10) alarmResponse, err := cc.AlarmList() assert.NoError(t, err, "error on alarm list") assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index a0ff8a83511..411bc34b1ce 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -177,8 +177,10 @@ type EtcdProcessClusterConfig struct { DiscoveryToken string LogLevel string - MaxConcurrentStreams uint32 // default is math.MaxUint32 - CorruptCheckTime time.Duration + MaxConcurrentStreams uint32 // default is math.MaxUint32 + CorruptCheckTime time.Duration + CompactHashCheckEnabled bool + CompactHashCheckTime time.Duration } // NewEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -351,6 +353,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* if cfg.CorruptCheckTime != 0 { args = append(args, "--experimental-corrupt-check-time", fmt.Sprintf("%s", cfg.CorruptCheckTime)) } + if cfg.CompactHashCheckEnabled { + args = append(args, "--experimental-compact-hash-check-enabled") + } + if cfg.CompactHashCheckTime != 0 { + args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String()) + } etcdCfgs[i] = &EtcdServerProcessConfig{ lg: lg,