Skip to content

Commit

Permalink
add sleep interval
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed May 24, 2021
1 parent b29e8e4 commit 184b0e5
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 7 deletions.
1 change: 1 addition & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type ServerConfig struct {
AutoCompactionRetention time.Duration
AutoCompactionMode string
CompactionBatchLimit int
CompactionSleepInterval time.Duration
QuotaBackendBytes int64
MaxTxnOps uint

Expand Down
6 changes: 4 additions & 2 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,10 @@ type Config struct {
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func newConfig() *config {

fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
Expand Down
8 changes: 7 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,10 +608,16 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})

mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

kvindex := ci.ConsistentIndex()
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))

if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
Expand Down
7 changes: 6 additions & 1 deletion server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ const (

var restoreChunkKeys = 10000 // non-const for testing
var defaultCompactBatchLimit = 1000
var minimumBatchInterval = 10 * time.Millisecond

type StoreConfig struct {
CompactionBatchLimit int
CompactionBatchLimit int
CompactionSleepInterval time.Duration
}

type store struct {
Expand Down Expand Up @@ -96,6 +98,9 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
if cfg.CompactionBatchLimit == 0 {
cfg.CompactionBatchLimit = defaultCompactBatchLimit
}
if cfg.CompactionSleepInterval == 0 {
cfg.CompactionSleepInterval = minimumBatchInterval
}
s := &store{
cfg: cfg,
b: b,
Expand Down
9 changes: 6 additions & 3 deletions server/mvcc/kvstore_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))

batchNum := s.cfg.CompactionBatchLimit
batchInterval := s.cfg.CompactionSleepInterval

last := make([]byte, 8+1+8)
for {
var rev revision
Expand All @@ -40,7 +43,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc

tx := s.b.BatchTx()
tx.Lock()
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit))
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum))
for _, key := range keys {
rev = bytesToRev(key)
if _, ok := keep[rev]; !ok {
Expand All @@ -49,7 +52,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
}
}

if len(keys) < s.cfg.CompactionBatchLimit {
if len(keys) < batchNum {
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
Expand All @@ -70,7 +73,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))

select {
case <-time.After(10 * time.Millisecond):
case <-time.After(batchInterval):
case <-s.stopc:
return false
}
Expand Down

0 comments on commit 184b0e5

Please sign in to comment.