Skip to content

Commit

Permalink
pump: Extract function and add unit tests (#503)
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku authored and july2993 committed Apr 10, 2019
1 parent b03249e commit f5a60c3
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 25 deletions.
52 changes: 27 additions & 25 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,12 @@ func NewAppend(dir string, options *Options) (append *Append, err error) {
}

// NewAppendWithResolver returns a instance of Append
// if tiStore and tiLockResolver is not nil, we will try to query tikv to know weather a txn is committed
// if tiStore and tiLockResolver is not nil, we will try to query tikv to know whether a txn is committed
func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiLockResolver *tikv.LockResolver) (append *Append, err error) {
if options == nil {
options = DefaultOptions()
}

kvDir := path.Join(dir, "kv")
valueDir := path.Join(dir, "value")
err = os.MkdirAll(valueDir, 0755)
if err != nil {
Expand All @@ -113,28 +112,8 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
return nil, errors.Trace(err)
}

var opt opt.Options
cf := options.Storage
if cf == nil {
cf = defaultStorageConfig
} else {
setDefaultStorageConfig(cf)
}

log.Infof("storage config: %+v", cf)

opt.BlockCacheCapacity = cf.BlockCacheCapacity
opt.BlockRestartInterval = cf.BlockRestartInterval
opt.BlockSize = cf.BlockSize
opt.CompactionL0Trigger = cf.CompactionL0Trigger
opt.CompactionTableSize = cf.CompactionTableSize
opt.CompactionTotalSize = cf.CompactionTotalSize
opt.CompactionTotalSizeMultiplier = cf.CompactionTotalSizeMultiplier
opt.WriteBuffer = cf.WriteBuffer
opt.WriteL0PauseTrigger = cf.WriteL0PauseTrigger
opt.WriteL0SlowdownTrigger = cf.WriteL0SlowdownTrigger

metadata, err := leveldb.OpenFile(kvDir, &opt)
kvDir := path.Join(dir, "kv")
metadata, err := openMetadataDB(kvDir, options.Storage)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -229,7 +208,6 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL

append.wg.Add(1)
go append.updateStatus()

return
}

Expand Down Expand Up @@ -1078,3 +1056,27 @@ func setDefaultStorageConfig(cf *Config) {
cf.WriteL0SlowdownTrigger = defaultStorageConfig.WriteL0SlowdownTrigger
}
}

func openMetadataDB(kvDir string, cf *Config) (*leveldb.DB, error) {
if cf == nil {
cf = defaultStorageConfig
} else {
setDefaultStorageConfig(cf)
}

log.Infof("Storage config: %+v", cf)

var opt opt.Options
opt.BlockCacheCapacity = cf.BlockCacheCapacity
opt.BlockRestartInterval = cf.BlockRestartInterval
opt.BlockSize = cf.BlockSize
opt.CompactionL0Trigger = cf.CompactionL0Trigger
opt.CompactionTableSize = cf.CompactionTableSize
opt.CompactionTotalSize = cf.CompactionTotalSize
opt.CompactionTotalSizeMultiplier = cf.CompactionTotalSizeMultiplier
opt.WriteBuffer = cf.WriteBuffer
opt.WriteL0PauseTrigger = cf.WriteL0PauseTrigger
opt.WriteL0SlowdownTrigger = cf.WriteL0SlowdownTrigger

return leveldb.OpenFile(kvDir, &opt)
}
29 changes: 29 additions & 0 deletions pump/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,32 @@ func (as *AppendSuit) TestResolve(c *check.C) {
// TODO test the case we query tikv to know weather a txn a commit
// is there a fake or mock kv.Storage and tikv.LockResolver to easy the test?
}

type OpenDBSuit struct {
dir string
}

var _ = check.Suite(&OpenDBSuit{})

func (s *OpenDBSuit) SetUpTest(c *check.C) {
s.dir = c.MkDir()
}

func (s *OpenDBSuit) TestWhenConfigIsNotProvided(c *check.C) {
_, err := openMetadataDB(s.dir, nil)
c.Assert(err, check.IsNil)
}

func (s *OpenDBSuit) TestProvidedConfigValsNotOverwritten(c *check.C) {
cf := Config{
KVConfig: KVConfig{
BlockRestartInterval: 32,
WriteL0PauseTrigger: 12,
},
}
_, err := openMetadataDB(s.dir, &cf)
c.Assert(err, check.IsNil)
c.Assert(cf.BlockRestartInterval, check.Equals, 32)
c.Assert(cf.WriteL0PauseTrigger, check.Equals, 12)
c.Assert(cf.BlockCacheCapacity, check.Equals, defaultStorageConfig.BlockCacheCapacity)
}

0 comments on commit f5a60c3

Please sign in to comment.