From 5c824ddff7567e7867edbfd91a48acc2b4d5a60b Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 4 Jan 2017 17:01:31 -0800 Subject: [PATCH] mvcc: txns and r/w views Clean-up of the mvcc interfaces to use txn interfaces instead of an id. Adds support for concurrent read-only mvcc transactions. Fixes #7083 --- mvcc/kv.go | 78 ++++-- mvcc/kv_test.go | 95 +++---- mvcc/kv_view.go | 53 ++++ mvcc/kvstore.go | 390 ++++------------------------- mvcc/kvstore_bench_test.go | 16 +- mvcc/kvstore_test.go | 86 +++---- mvcc/kvstore_txn.go | 254 +++++++++++++++++++ mvcc/metrics_txn.go | 67 +++++ mvcc/watchable_store.go | 150 +++-------- mvcc/watchable_store_bench_test.go | 8 +- mvcc/watchable_store_txn.go | 53 ++++ 11 files changed, 641 insertions(+), 609 deletions(-) create mode 100644 mvcc/kv_view.go create mode 100644 mvcc/kvstore_txn.go create mode 100644 mvcc/metrics_txn.go create mode 100644 mvcc/watchable_store_txn.go diff --git a/mvcc/kv.go b/mvcc/kv.go index c851c8725e88..e13cd647948e 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -32,15 +32,15 @@ type RangeResult struct { Count int } -type KV interface { - // Rev returns the current revision of the KV. - Rev() int64 - - // FirstRev returns the first revision of the KV. +type ReadView interface { + // FirstRev returns the first KV revision at the time of opening the txn. // After a compaction, the first revision increases to the compaction // revision. FirstRev() int64 + // Rev returns the revision of the KV at the time of opening the txn. + Rev() int64 + // Range gets the keys in the range at rangeRev. // The returned rev is the current revision of the KV when the operation is executed. // If rangeRev <=0, range gets the keys at currentRev. @@ -50,14 +50,17 @@ type KV interface { // Limit limits the number of keys returned. // If the required rev is compacted, ErrCompacted will be returned. Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) +} - // Put puts the given key, value into the store. Put also takes additional argument lease to - // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease - // id. - // A put also increases the rev of the store, and generates one event in the event history. - // The returned rev is the current revision of the KV when the operation is executed. - Put(key, value []byte, lease lease.LeaseID) (rev int64) +// TxnRead represents a read-only transaction with operations that will not +// block other read transactions. +type TxnRead interface { + ReadView + // End marks the transaction is complete and ready to commit. + End() +} +type WriteView interface { // DeleteRange deletes the given range from the store. // A deleteRange increases the rev of the store if any key in the range exists. // The number of key deleted will be returned. @@ -67,26 +70,49 @@ type KV interface { // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end). DeleteRange(key, end []byte) (n, rev int64) - // TxnBegin begins a txn. Only Txn prefixed operation can be executed, others will be blocked - // until txn ends. Only one on-going txn is allowed. - // TxnBegin returns an int64 txn ID. - // All txn prefixed operations with same txn ID will be done with the same rev. - TxnBegin() int64 - // TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned. - TxnEnd(txnID int64) error - // TxnRange returns the current revision of the KV when the operation is executed. - TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) - TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) - TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) + // Put puts the given key, value into the store. Put also takes additional argument lease to + // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease + // id. + // A put also increases the rev of the store, and generates one event in the event history. + // The returned rev is the current revision of the KV when the operation is executed. + Put(key, value []byte, lease lease.LeaseID) (rev int64) +} - // Compact frees all superseded keys with revisions less than rev. - Compact(rev int64) (<-chan struct{}, error) +// TxnWrite represents a transaction that can modify the store. +type TxnWrite interface { + TxnRead + WriteView + // Changes gets the changes made since opening the write txn. + Changes() []mvccpb.KeyValue +} + +// txnReadWrite coerces a read txn to a write, panicking on any write operation. +type txnReadWrite struct{ TxnRead } + +func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") } +func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) { + panic("unexpected Put") +} +func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { panic("unexpected Changes") } + +type KV interface { + ReadView + WriteView + + // Read creates a read transaction. + Read() TxnRead + + // Write creates a write transaction. + Write() TxnWrite // Hash retrieves the hash of KV state and revision. - // This method is designed for consistency checking purpose. + // This method is designed for consistency checking purposes. Hash() (hash uint32, revision int64, err error) - // Commit commits txns into the underlying backend. + // Compact frees all superseded keys with revisions less than rev. + Compact(rev int64) (<-chan struct{}, error) + + // Commit commits outstanding txns into the underlying backend. Commit() // Restore restores the KV store from a backend. diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 721d70a57f92..162dac40b5cc 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -43,35 +43,27 @@ var ( return kv.Range(key, end, ro) } txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) { - id := kv.TxnBegin() - defer kv.TxnEnd(id) - return kv.TxnRange(id, key, end, ro) + txn := kv.Read() + defer txn.End() + return txn.Range(key, end, ro) } normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 { return kv.Put(key, value, lease) } txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 { - id := kv.TxnBegin() - defer kv.TxnEnd(id) - rev, err := kv.TxnPut(id, key, value, lease) - if err != nil { - panic("txn put error") - } - return rev + txn := kv.Write() + defer txn.End() + return txn.Put(key, value, lease) } normalDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) { return kv.DeleteRange(key, end) } txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) { - id := kv.TxnBegin() - defer kv.TxnEnd(id) - n, rev, err := kv.TxnDeleteRange(id, key, end) - if err != nil { - panic("txn delete error") - } - return n, rev + txn := kv.Write() + defer txn.End() + return txn.DeleteRange(key, end) } ) @@ -142,7 +134,7 @@ func testKVRange(t *testing.T, f rangeFunc) { } func TestKVRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } -func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } +func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) } func testKVRangeRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() @@ -178,7 +170,7 @@ func testKVRangeRev(t *testing.T, f rangeFunc) { } func TestKVRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) } -func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) } +func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) } func testKVRangeBadRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() @@ -404,17 +396,16 @@ func TestKVOperationInSequence(t *testing.T) { } } -func TestKVTxnBlockNonTxnOperations(t *testing.T) { +func TestKVTxnBlockWriteOperations(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(b, &lease.FakeLessor{}, nil) tests := []func(){ - func() { s.Range([]byte("foo"), nil, RangeOptions{}) }, func() { s.Put([]byte("foo"), nil, lease.NoLease) }, func() { s.DeleteRange([]byte("foo"), nil) }, } for i, tt := range tests { - id := s.TxnBegin() + txn := s.Write() done := make(chan struct{}, 1) go func() { tt() @@ -426,7 +417,7 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) { case <-time.After(10 * time.Millisecond): } - s.TxnEnd(id) + txn.End() select { case <-done: case <-time.After(10 * time.Second): @@ -438,39 +429,23 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) { cleanup(s, b, tmpPath) } -func TestKVTxnWrongID(t *testing.T) { +func TestKVTxnNonBlockRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(b, &lease.FakeLessor{}, nil) defer cleanup(s, b, tmpPath) - id := s.TxnBegin() - wrongid := id + 1 - - tests := []func() error{ - func() error { - _, err := s.TxnRange(wrongid, []byte("foo"), nil, RangeOptions{}) - return err - }, - func() error { - _, err := s.TxnPut(wrongid, []byte("foo"), nil, lease.NoLease) - return err - }, - func() error { - _, _, err := s.TxnDeleteRange(wrongid, []byte("foo"), nil) - return err - }, - func() error { return s.TxnEnd(wrongid) }, - } - for i, tt := range tests { - err := tt() - if err != ErrTxnIDMismatch { - t.Fatalf("#%d: err = %+v, want %+v", i, err, ErrTxnIDMismatch) - } - } + txn := s.Write() + defer txn.End() - err := s.TxnEnd(id) - if err != nil { - t.Fatalf("end err = %+v, want %+v", err, nil) + donec := make(chan struct{}) + go func() { + defer close(donec) + s.Range([]byte("foo"), nil, RangeOptions{}) + }() + select { + case <-donec: + case <-time.After(100 * time.Millisecond): + t.Fatalf("range operation blocked on write txn") } } @@ -481,19 +456,16 @@ func TestKVTxnOperationInSequence(t *testing.T) { defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { - id := s.TxnBegin() + txn := s.Write() base := int64(i + 1) // put foo - rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), lease.NoLease) - if err != nil { - t.Fatal(err) - } + rev := txn.Put([]byte("foo"), []byte("bar"), lease.NoLease) if rev != base+1 { t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1) } - r, err := s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1}) + r, err := txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1}) if err != nil { t.Fatal(err) } @@ -508,15 +480,12 @@ func TestKVTxnOperationInSequence(t *testing.T) { } // delete foo - n, rev, err := s.TxnDeleteRange(id, []byte("foo"), nil) - if err != nil { - t.Fatal(err) - } + n, rev := txn.DeleteRange([]byte("foo"), nil) if n != 1 || rev != base+1 { t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1) } - r, err = s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1}) + r, err = txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1}) if err != nil { t.Errorf("#%d: range error (%v)", i, err) } @@ -527,7 +496,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { t.Errorf("#%d: range rev = %d, want %d", i, r.Rev, base+1) } - s.TxnEnd(id) + txn.End() } } diff --git a/mvcc/kv_view.go b/mvcc/kv_view.go new file mode 100644 index 000000000000..f40ba8edc22b --- /dev/null +++ b/mvcc/kv_view.go @@ -0,0 +1,53 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "github.com/coreos/etcd/lease" +) + +type readView struct{ kv KV } + +func (rv *readView) FirstRev() int64 { + tr := rv.kv.Read() + defer tr.End() + return tr.FirstRev() +} + +func (rv *readView) Rev() int64 { + tr := rv.kv.Read() + defer tr.End() + return tr.Rev() +} + +func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { + tr := rv.kv.Read() + defer tr.End() + return tr.Range(key, end, ro) +} + +type writeView struct{ kv KV } + +func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) { + tw := wv.kv.Write() + defer tw.End() + return tw.DeleteRange(key, end) +} + +func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) { + tw := wv.kv.Write() + defer tw.End() + return tw.Put(key, value, lease) +} diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index f9f78c155472..6a0e4167dd0a 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -18,7 +18,6 @@ import ( "encoding/binary" "errors" "math" - "math/rand" "sync" "time" @@ -45,10 +44,10 @@ var ( scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") - ErrTxnIDMismatch = errors.New("mvcc: txn id mismatch") - ErrCompacted = errors.New("mvcc: required revision has been compacted") - ErrFutureRev = errors.New("mvcc: required revision is a future revision") - ErrCanceled = errors.New("mvcc: watcher is canceled") + ErrCompacted = errors.New("mvcc: required revision has been compacted") + ErrFutureRev = errors.New("mvcc: required revision is a future revision") + ErrCanceled = errors.New("mvcc: watcher is canceled") + ErrClosed = errors.New("mvcc: closed") plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") ) @@ -61,7 +60,11 @@ type ConsistentIndexGetter interface { } type store struct { - mu sync.Mutex // guards the following + ReadView + WriteView + + // mu read locks for txns and write locks for non-txn store changes. + mu sync.RWMutex ig ConsistentIndexGetter @@ -70,19 +73,19 @@ type store struct { le lease.Lessor - currentRev revision - // the main revision of the last compaction + // revMuLock protects currentRev and compactMainRev. + // Locked at end of write txn and released after write txn unlock lock. + // Locked before locking read txn and released after locking. + revMu sync.RWMutex + // currentRev is the revision of the last completed transaction. + currentRev int64 + // compactMainRev is the main revision of the last compaction. compactMainRev int64 - tx backend.BatchTx - txnID int64 // tracks the current txnID to verify txn operations - txnModify bool - // bytesBuf8 is a byte slice of length 8 // to avoid a repetitive allocation in saveIndex. bytesBuf8 []byte - changes []mvccpb.KeyValue fifoSched schedule.Scheduler stopc chan struct{} @@ -98,7 +101,7 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto le: le, - currentRev: revision{main: 1}, + currentRev: 1, compactMainRev: -1, bytesBuf8: make([]byte, 8), @@ -106,9 +109,10 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto stopc: make(chan struct{}), } - + s.ReadView = &readView{s} + s.WriteView = &writeView{s} if s.le != nil { - s.le.SetRangeDeleter(s) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) } tx := s.b.BatchTx() @@ -126,140 +130,6 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto return s } -func (s *store) Rev() int64 { - s.mu.Lock() - defer s.mu.Unlock() - - return s.currentRev.main -} - -func (s *store) FirstRev() int64 { - s.mu.Lock() - defer s.mu.Unlock() - - return s.compactMainRev -} - -func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 { - id := s.TxnBegin() - s.put(key, value, lease) - s.txnEnd(id) - - putCounter.Inc() - - return int64(s.currentRev.main) -} - -func (s *store) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - id := s.TxnBegin() - kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count) - s.txnEnd(id) - - rangeCounter.Inc() - - r = &RangeResult{ - KVs: kvs, - Count: count, - Rev: rev, - } - - return r, err -} - -func (s *store) DeleteRange(key, end []byte) (n, rev int64) { - id := s.TxnBegin() - n = s.deleteRange(key, end) - s.txnEnd(id) - - deleteCounter.Inc() - - return n, int64(s.currentRev.main) -} - -func (s *store) TxnBegin() int64 { - s.mu.Lock() - s.currentRev.sub = 0 - s.tx = s.b.BatchTx() - s.tx.Lock() - - s.txnID = rand.Int63() - return s.txnID -} - -func (s *store) TxnEnd(txnID int64) error { - err := s.txnEnd(txnID) - if err != nil { - return err - } - - txnCounter.Inc() - return nil -} - -// txnEnd is used for unlocking an internal txn. It does -// not increase the txnCounter. -func (s *store) txnEnd(txnID int64) error { - if txnID != s.txnID { - return ErrTxnIDMismatch - } - - // only update index if the txn modifies the mvcc state. - // read only txn might execute with one write txn concurrently, - // it should not write its index to mvcc. - if s.txnModify { - s.saveIndex() - } - s.txnModify = false - - s.tx.Unlock() - if s.currentRev.sub != 0 { - s.currentRev.main += 1 - } - s.currentRev.sub = 0 - - dbTotalSize.Set(float64(s.b.Size())) - s.mu.Unlock() - return nil -} - -func (s *store) TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - if txnID != s.txnID { - return nil, ErrTxnIDMismatch - } - - kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count) - - r = &RangeResult{ - KVs: kvs, - Count: count, - Rev: rev, - } - return r, err -} - -func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) { - if txnID != s.txnID { - return 0, ErrTxnIDMismatch - } - - s.put(key, value, lease) - return int64(s.currentRev.main + 1), nil -} - -func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { - if txnID != s.txnID { - return 0, 0, ErrTxnIDMismatch - } - - n = s.deleteRange(key, end) - if n != 0 || s.currentRev.sub != 0 { - rev = int64(s.currentRev.main + 1) - } else { - rev = int64(s.currentRev.main) - } - return n, rev, nil -} - func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { if ctx == nil || ctx.Err() != nil { s.mu.Lock() @@ -275,16 +145,32 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { close(ch) } +func (s *store) Hash() (hash uint32, revision int64, err error) { + // TODO: nothing should be able to call into backend when closed + select { + case <-s.stopc: + return 0, 0, ErrClosed + default: + } + + s.b.ForceCommit() + h, err := s.b.Hash(DefaultIgnores) + return h, s.currentRev, err +} + func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.mu.Lock() defer s.mu.Unlock() + s.revMu.Lock() + defer s.revMu.Unlock() + if rev <= s.compactMainRev { ch := make(chan struct{}) f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } s.fifoSched.Schedule(f) return ch, ErrCompacted } - if rev > s.currentRev.main { + if rev > s.currentRev { return nil, ErrFutureRev } @@ -333,24 +219,14 @@ func init() { } } -func (s *store) Hash() (uint32, int64, error) { - s.mu.Lock() - defer s.mu.Unlock() - s.b.ForceCommit() - - h, err := s.b.Hash(DefaultIgnores) - rev := s.currentRev.main - return h, rev, err -} - func (s *store) Commit() { s.mu.Lock() defer s.mu.Unlock() - s.tx = s.b.BatchTx() - s.tx.Lock() - s.saveIndex() - s.tx.Unlock() + tx := s.b.BatchTx() + tx.Lock() + s.saveIndex(tx) + tx.Unlock() s.b.ForceCommit() } @@ -363,10 +239,8 @@ func (s *store) Restore(b backend.Backend) error { s.b = b s.kvindex = newTreeIndex() - s.currentRev = revision{main: 1} + s.currentRev = 1 s.compactMainRev = -1 - s.tx = b.BatchTx() - s.txnID = -1 s.fifoSched = schedule.NewFIFOScheduler() s.stopc = make(chan struct{}) @@ -403,6 +277,7 @@ func (s *store) restore() error { } rev := bytesToRev(key[:revBytesLen]) + s.currentRev = rev.main // restore index switch { @@ -428,9 +303,6 @@ func (s *store) restore() error { delete(keyToLease, string(kv.Key)) } } - - // update revision - s.currentRev = rev } // restore the tree index from the unordered index. @@ -441,8 +313,8 @@ func (s *store) restore() error { // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. // the correct revision should be set to compaction revision in the case, not the largest revision // we have seen. - if s.currentRev.main < s.compactMainRev { - s.currentRev.main = s.compactMainRev + if s.currentRev < s.compactMainRev { + s.currentRev = s.compactMainRev } for key, lid := range keyToLease { @@ -490,180 +362,10 @@ func (a *store) Equal(b *store) bool { return a.kvindex.Equal(b.kvindex) } -// range is a keyword in Go, add Keys suffix. -func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool) (kvs []mvccpb.KeyValue, count int, curRev int64, err error) { - curRev = int64(s.currentRev.main) - if s.currentRev.sub > 0 { - curRev += 1 - } - - if rangeRev > curRev { - return nil, -1, s.currentRev.main, ErrFutureRev - } - var rev int64 - if rangeRev <= 0 { - rev = curRev - } else { - rev = rangeRev - } - if rev < s.compactMainRev { - return nil, -1, 0, ErrCompacted - } - - _, revpairs := s.kvindex.Range(key, end, int64(rev)) - if len(revpairs) == 0 { - return nil, 0, curRev, nil - } - if countOnly { - return nil, len(revpairs), curRev, nil - } - - for _, revpair := range revpairs { - start, end := revBytesRange(revpair) - - _, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0) - if len(vs) != 1 { - plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub) - } - - var kv mvccpb.KeyValue - if err := kv.Unmarshal(vs[0]); err != nil { - plog.Fatalf("cannot unmarshal event: %v", err) - } - kvs = append(kvs, kv) - if limit > 0 && len(kvs) >= int(limit) { - break - } - } - return kvs, len(revpairs), curRev, nil -} - -func (s *store) put(key, value []byte, leaseID lease.LeaseID) { - s.txnModify = true - - rev := s.currentRev.main + 1 - c := rev - oldLease := lease.NoLease - - // if the key exists before, use its previous created and - // get its previous leaseID - _, created, ver, err := s.kvindex.Get(key, rev) - if err == nil { - c = created.main - oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)}) - } - - ibytes := newRevBytes() - revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes) - - ver = ver + 1 - kv := mvccpb.KeyValue{ - Key: key, - Value: value, - CreateRevision: c, - ModRevision: rev, - Version: ver, - Lease: int64(leaseID), - } - - d, err := kv.Marshal() - if err != nil { - plog.Fatalf("cannot marshal event: %v", err) - } - - s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) - s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub}) - s.changes = append(s.changes, kv) - s.currentRev.sub += 1 - - if oldLease != lease.NoLease { - if s.le == nil { - panic("no lessor to detach lease") - } - - err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) - if err != nil { - plog.Errorf("unexpected error from lease detach: %v", err) - } - } - - if leaseID != lease.NoLease { - if s.le == nil { - panic("no lessor to attach lease") - } - - err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}}) - if err != nil { - panic("unexpected error from lease Attach") - } - } -} - -func (s *store) deleteRange(key, end []byte) int64 { - s.txnModify = true - - rrev := s.currentRev.main - if s.currentRev.sub > 0 { - rrev += 1 - } - keys, revs := s.kvindex.Range(key, end, rrev) - - if len(keys) == 0 { - return 0 - } - - for i, key := range keys { - s.delete(key, revs[i]) - } - return int64(len(keys)) -} - -func (s *store) delete(key []byte, rev revision) { - mainrev := s.currentRev.main + 1 - - ibytes := newRevBytes() - revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes) - ibytes = appendMarkTombstone(ibytes) - - kv := mvccpb.KeyValue{ - Key: key, - } - - d, err := kv.Marshal() - if err != nil { - plog.Fatalf("cannot marshal event: %v", err) - } - - s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) - err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub}) - if err != nil { - plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) - } - s.changes = append(s.changes, kv) - s.currentRev.sub += 1 - - item := lease.LeaseItem{Key: string(key)} - leaseID := s.le.GetLease(item) - - if leaseID != lease.NoLease { - err = s.le.Detach(leaseID, []lease.LeaseItem{item}) - if err != nil { - plog.Errorf("cannot detach %v", err) - } - } -} - -func (s *store) getChanges() []mvccpb.KeyValue { - changes := s.changes - s.changes = make([]mvccpb.KeyValue, 0, 4) - return changes -} - -func (s *store) saveIndex() { +func (s *store) saveIndex(tx backend.BatchTx) { if s.ig == nil { return } - tx := s.tx bs := s.bytesBuf8 binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex()) // put the index into the underlying backend diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index c111613cea62..9f2b0c542ca8 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -78,11 +78,9 @@ func BenchmarkStoreTxnPut(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - id := s.TxnBegin() - if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { - plog.Fatalf("txn put error: %v", err) - } - s.TxnEnd(id) + txn := s.Write() + txn.Put(keys[i], vals[i], lease.NoLease) + txn.End() } } @@ -100,11 +98,9 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < revsPerKey; j++ { - id := s.TxnBegin() - if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { - plog.Fatalf("txn put error: %v", err) - } - s.TxnEnd(id) + txn := s.Write() + txn.Put(keys[i], vals[i], lease.NoLease) + txn.End() } } b.ResetTimer() diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 816a1b66d601..8f1972426cd4 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -72,7 +72,7 @@ func TestStorePut(t *testing.T) { indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound}, nil, - revision{1, 1}, + revision{2, 0}, newTestKeyBytes(revision{2, 0}, false), mvccpb.KeyValue{ Key: []byte("foo"), @@ -89,8 +89,8 @@ func TestStorePut(t *testing.T) { indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil}, &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}}, - revision{1, 2}, - newTestKeyBytes(revision{2, 1}, false), + revision{2, 0}, + newTestKeyBytes(revision{2, 0}, false), mvccpb.KeyValue{ Key: []byte("foo"), Value: []byte("bar"), @@ -99,14 +99,14 @@ func TestStorePut(t *testing.T) { Version: 2, Lease: 2, }, - revision{2, 1}, + revision{2, 0}, }, { revision{2, 0}, indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil}, &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}}, - revision{2, 1}, + revision{3, 0}, newTestKeyBytes(revision{3, 0}, false), mvccpb.KeyValue{ Key: []byte("foo"), @@ -124,14 +124,13 @@ func TestStorePut(t *testing.T) { b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = tt.rev - s.tx = b.BatchTx() + s.currentRev = tt.rev.main fi.indexGetRespc <- tt.r if tt.rr != nil { b.tx.rangeRespc <- *tt.rr } - s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1)) + s.Put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1)) data, err := tt.wkv.Marshal() if err != nil { @@ -158,7 +157,7 @@ func TestStorePut(t *testing.T) { if g := fi.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } - if s.currentRev != tt.wrev { + if s.currentRev != tt.wrev.main { t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) } @@ -179,7 +178,6 @@ func TestStoreRange(t *testing.T) { if err != nil { t.Fatal(err) } - currev := revision{1, 1} wrev := int64(2) tests := []struct { @@ -195,25 +193,26 @@ func TestStoreRange(t *testing.T) { rangeResp{[][]byte{key}, [][]byte{kvb}}, }, } + + ro := RangeOptions{Limit: 1, Rev: 0, Count: false} for i, tt := range tests { s := newFakeStore() b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = currev - s.tx = b.BatchTx() + s.currentRev = 2 b.tx.rangeRespc <- tt.r fi.indexRangeRespc <- tt.idxr - kvs, _, rev, err := s.rangeKeys([]byte("foo"), []byte("goo"), 1, 0, false) + ret, err := s.Range([]byte("foo"), []byte("goo"), ro) if err != nil { t.Errorf("#%d: err = %v, want nil", i, err) } - if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w) + if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(ret.KVs, w) { + t.Errorf("#%d: kvs = %+v, want %+v", i, ret.KVs, w) } - if rev != wrev { - t.Errorf("#%d: rev = %d, want %d", i, rev, wrev) + if ret.Rev != wrev { + t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev) } wstart, wend := revBytesRange(tt.idxr.revs[0]) @@ -229,8 +228,8 @@ func TestStoreRange(t *testing.T) { if g := fi.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } - if s.currentRev != currev { - t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev) + if s.currentRev != 2 { + t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, 2) } s.Close() @@ -267,32 +266,21 @@ func TestStoreDeleteRange(t *testing.T) { rangeResp{[][]byte{key}, [][]byte{kvb}}, newTestKeyBytes(revision{3, 0}, true), - revision{2, 1}, + revision{3, 0}, 2, revision{3, 0}, }, - { - revision{2, 1}, - indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, - rangeResp{[][]byte{key}, [][]byte{kvb}}, - - newTestKeyBytes(revision{3, 1}, true), - revision{2, 2}, - 3, - revision{3, 1}, - }, } for i, tt := range tests { s := newFakeStore() b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = tt.rev - s.tx = b.BatchTx() + s.currentRev = tt.rev.main fi.indexRangeRespc <- tt.r b.tx.rangeRespc <- tt.rr - n := s.deleteRange([]byte("foo"), []byte("goo")) + n, _ := s.DeleteRange([]byte("foo"), []byte("goo")) if n != 1 { t.Errorf("#%d: n = %d, want 1", i, n) } @@ -316,7 +304,7 @@ func TestStoreDeleteRange(t *testing.T) { if g := fi.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } - if s.currentRev != tt.wrev { + if s.currentRev != tt.wrev.main { t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) } } @@ -328,7 +316,7 @@ func TestStoreCompact(t *testing.T) { b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = revision{3, 0} + s.currentRev = 3 fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} key1 := newTestKeyBytes(revision{1, 0}, false) key2 := newTestKeyBytes(revision{2, 0}, false) @@ -393,9 +381,8 @@ func TestStoreRestore(t *testing.T) { if s.compactMainRev != 3 { t.Errorf("compact rev = %d, want 5", s.compactMainRev) } - wrev := revision{5, 0} - if !reflect.DeepEqual(s.currentRev, wrev) { - t.Errorf("current rev = %v, want %v", s.currentRev, wrev) + if s.currentRev != 5 { + t.Errorf("current rev = %v, want 5", s.currentRev) } wact := []testutil.Action{ {"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, @@ -479,18 +466,12 @@ func TestTxnPut(t *testing.T) { defer cleanup(s, b, tmpPath) for i := 0; i < sliceN; i++ { - id := s.TxnBegin() + txn := s.Write() base := int64(i + 2) - - rev, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease) - if err != nil { - t.Error("txn put error") - } - if rev != base { + if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base { t.Errorf("#%d: rev = %d, want %d", i, rev, base) } - - s.TxnEnd(id) + txn.End() } } @@ -499,7 +480,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) { s := NewStore(b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) - id := s.TxnBegin() + txn := s.Read() done := make(chan struct{}) go func() { @@ -512,7 +493,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) { case <-time.After(100 * time.Millisecond): } - s.TxnEnd(id) + txn.End() select { case <-done: case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO @@ -562,15 +543,17 @@ func newFakeStore() *store { indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), indexCompactRespc: make(chan map[revision]struct{}, 1), } - return &store{ + s := &store{ b: b, le: &lease.FakeLessor{}, kvindex: fi, - currentRev: revision{}, + currentRev: 0, compactMainRev: -1, fifoSched: schedule.NewFIFOScheduler(), stopc: make(chan struct{}), } + s.ReadView, s.WriteView = &readView{s}, &writeView{s} + return s } type rangeResp struct { @@ -611,6 +594,7 @@ type fakeBackend struct { } func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } +func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx } func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil } func (b *fakeBackend) Size() int64 { return 0 } func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go new file mode 100644 index 000000000000..1e614755190a --- /dev/null +++ b/mvcc/kvstore_txn.go @@ -0,0 +1,254 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/mvcc/mvccpb" +) + +type storeTxnRead struct { + s *store + tx backend.ReadTx + + firstRev int64 + rev int64 +} + +func (s *store) Read() TxnRead { + s.mu.RLock() + tx := s.b.ReadTx() + s.revMu.RLock() + tx.Lock() + firstRev, rev := s.compactMainRev, s.currentRev + s.revMu.RUnlock() + return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev}) +} + +func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev } +func (tr *storeTxnRead) Rev() int64 { return tr.rev } + +func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { + return tr.rangeKeys(key, end, tr.Rev(), ro) +} + +func (tr *storeTxnRead) End() { + tr.tx.Unlock() + tr.s.mu.RUnlock() +} + +type storeTxnWrite struct { + *storeTxnRead + tx backend.BatchTx + // beginRev is the revision where the txn begins; it will write to the next revision. + beginRev int64 + changes []mvccpb.KeyValue +} + +func (s *store) Write() TxnWrite { + s.mu.RLock() + tx := s.b.BatchTx() + tx.Lock() + tw := &storeTxnWrite{ + storeTxnRead: &storeTxnRead{s, tx, 0, 0}, + tx: tx, + beginRev: s.currentRev, + changes: make([]mvccpb.KeyValue, 0, 4), + } + return newMetricsTxnWrite(tw) +} + +func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev } + +func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { + rev := tw.beginRev + if len(tw.changes) > 0 { + rev++ + } + return tw.rangeKeys(key, end, rev, ro) +} + +func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) { + if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 { + return n, int64(tw.beginRev + 1) + } + return 0, int64(tw.beginRev) +} + +func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 { + tw.put(key, value, lease) + return int64(tw.beginRev + 1) +} + +func (tw *storeTxnWrite) End() { + // only update index if the txn modifies the mvcc state. + if len(tw.changes) != 0 { + tw.s.saveIndex(tw.tx) + // hold revMu lock to prevent new read txns from opening until writeback. + tw.s.revMu.Lock() + tw.s.currentRev++ + } + tw.tx.Unlock() + if len(tw.changes) != 0 { + tw.s.revMu.Unlock() + } + dbTotalSize.Set(float64(tw.s.b.Size())) + tw.s.mu.RUnlock() +} + +func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) { + rev := ro.Rev + if rev > curRev { + return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev + } + if rev <= 0 { + rev = curRev + } + if rev < tr.s.compactMainRev { + return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted + } + + _, revpairs := tr.s.kvindex.Range(key, end, int64(rev)) + if len(revpairs) == 0 { + return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil + } + if ro.Count { + return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil + } + + var kvs []mvccpb.KeyValue + for _, revpair := range revpairs { + start, end := revBytesRange(revpair) + _, vs := tr.tx.UnsafeRange(keyBucketName, start, end, 0) + if len(vs) != 1 { + plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub) + } + + var kv mvccpb.KeyValue + if err := kv.Unmarshal(vs[0]); err != nil { + plog.Fatalf("cannot unmarshal event: %v", err) + } + kvs = append(kvs, kv) + if ro.Limit > 0 && len(kvs) >= int(ro.Limit) { + break + } + } + return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil +} + +func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { + rev := tw.beginRev + 1 + c := rev + oldLease := lease.NoLease + + // if the key exists before, use its previous created and + // get its previous leaseID + _, created, ver, err := tw.s.kvindex.Get(key, rev) + if err == nil { + c = created.main + oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) + } + + ibytes := newRevBytes() + idxRev := revision{main: rev, sub: int64(len(tw.changes))} + revToBytes(idxRev, ibytes) + + ver = ver + 1 + kv := mvccpb.KeyValue{ + Key: key, + Value: value, + CreateRevision: c, + ModRevision: rev, + Version: ver, + Lease: int64(leaseID), + } + + d, err := kv.Marshal() + if err != nil { + plog.Fatalf("cannot marshal event: %v", err) + } + + tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) + tw.s.kvindex.Put(key, idxRev) + tw.changes = append(tw.changes, kv) + + if oldLease != lease.NoLease { + if tw.s.le == nil { + panic("no lessor to detach lease") + } + err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) + if err != nil { + plog.Errorf("unexpected error from lease detach: %v", err) + } + } + if leaseID != lease.NoLease { + if tw.s.le == nil { + panic("no lessor to attach lease") + } + err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}}) + if err != nil { + panic("unexpected error from lease Attach") + } + } +} + +func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 { + rrev := tw.beginRev + if len(tw.changes) > 0 { + rrev += 1 + } + keys, revs := tw.s.kvindex.Range(key, end, rrev) + if len(keys) == 0 { + return 0 + } + for i, key := range keys { + tw.delete(key, revs[i]) + } + return int64(len(keys)) +} + +func (tw *storeTxnWrite) delete(key []byte, rev revision) { + ibytes := newRevBytes() + idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} + revToBytes(idxRev, ibytes) + ibytes = appendMarkTombstone(ibytes) + + kv := mvccpb.KeyValue{Key: key} + + d, err := kv.Marshal() + if err != nil { + plog.Fatalf("cannot marshal event: %v", err) + } + + tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) + err = tw.s.kvindex.Tombstone(key, idxRev) + if err != nil { + plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) + } + tw.changes = append(tw.changes, kv) + + item := lease.LeaseItem{Key: string(key)} + leaseID := tw.s.le.GetLease(item) + + if leaseID != lease.NoLease { + err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item}) + if err != nil { + plog.Errorf("cannot detach %v", err) + } + } +} + +func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes } diff --git a/mvcc/metrics_txn.go b/mvcc/metrics_txn.go new file mode 100644 index 000000000000..fd2144279aea --- /dev/null +++ b/mvcc/metrics_txn.go @@ -0,0 +1,67 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "github.com/coreos/etcd/lease" +) + +type metricsTxnWrite struct { + TxnWrite + ranges uint + puts uint + deletes uint +} + +func newMetricsTxnRead(tr TxnRead) TxnRead { + return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0} +} + +func newMetricsTxnWrite(tw TxnWrite) TxnWrite { + return &metricsTxnWrite{tw, 0, 0, 0} +} + +func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) { + tw.ranges++ + return tw.TxnWrite.Range(key, end, ro) +} + +func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) { + tw.deletes++ + return tw.TxnWrite.DeleteRange(key, end) +} + +func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) { + tw.puts++ + return tw.TxnWrite.Put(key, value, lease) +} + +func (tw *metricsTxnWrite) End() { + defer tw.TxnWrite.End() + if sum := tw.ranges + tw.puts + tw.deletes; sum != 1 { + if sum > 1 { + txnCounter.Inc() + } + return + } + switch { + case tw.ranges == 1: + rangeCounter.Inc() + case tw.puts == 1: + putCounter.Inc() + case tw.deletes == 1: + deleteCounter.Inc() + } +} diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index dbb79bcb6930..ce852fddea2d 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -41,10 +41,12 @@ type watchable interface { } type watchableStore struct { - mu sync.Mutex - *store + // mu protects watcher groups and batches. It should never be locked + // before locking store.mu to avoid deadlock. + mu sync.RWMutex + // victims are watcher batches that were blocked on the watch channel victims []watcherBatch victimc chan struct{} @@ -76,9 +78,11 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet synced: newWatcherGroup(), stopc: make(chan struct{}), } + s.store.ReadView = &readView{s} + s.store.WriteView = &writeView{s} if s.le != nil { // use this store as the deleter so revokes trigger watch events - s.le.SetRangeDeleter(s) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) } s.wg.Add(2) go s.syncWatchersLoop() @@ -86,89 +90,6 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet return s } -func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) { - s.mu.Lock() - defer s.mu.Unlock() - - rev = s.store.Put(key, value, lease) - changes := s.store.getChanges() - if len(changes) != 1 { - plog.Panicf("unexpected len(changes) != 1 after put") - } - - ev := mvccpb.Event{ - Type: mvccpb.PUT, - Kv: &changes[0], - } - s.notify(rev, []mvccpb.Event{ev}) - return rev -} - -func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { - s.mu.Lock() - defer s.mu.Unlock() - - n, rev = s.store.DeleteRange(key, end) - changes := s.store.getChanges() - - if len(changes) != int(n) { - plog.Panicf("unexpected len(changes) != n after deleteRange") - } - - if n == 0 { - return n, rev - } - - evs := make([]mvccpb.Event, n) - for i := range changes { - evs[i] = mvccpb.Event{ - Type: mvccpb.DELETE, - Kv: &changes[i]} - evs[i].Kv.ModRevision = rev - } - s.notify(rev, evs) - return n, rev -} - -func (s *watchableStore) TxnBegin() int64 { - s.mu.Lock() - return s.store.TxnBegin() -} - -func (s *watchableStore) TxnEnd(txnID int64) error { - err := s.store.TxnEnd(txnID) - if err != nil { - return err - } - - changes := s.getChanges() - if len(changes) == 0 { - s.mu.Unlock() - return nil - } - - rev := s.store.Rev() - evs := make([]mvccpb.Event, len(changes)) - for i, change := range changes { - switch change.CreateRevision { - case 0: - evs[i] = mvccpb.Event{ - Type: mvccpb.DELETE, - Kv: &changes[i]} - evs[i].Kv.ModRevision = rev - default: - evs[i] = mvccpb.Event{ - Type: mvccpb.PUT, - Kv: &changes[i]} - } - } - - s.notify(rev, evs) - s.mu.Unlock() - - return nil -} - func (s *watchableStore) Close() error { close(s.stopc) s.wg.Wait() @@ -186,9 +107,6 @@ func (s *watchableStore) NewWatchStream() WatchStream { } func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) { - s.mu.Lock() - defer s.mu.Unlock() - wa := &watcher{ key: key, end: end, @@ -198,21 +116,24 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c fcs: fcs, } - s.store.mu.Lock() - synced := startRev > s.store.currentRev.main || startRev == 0 + s.mu.Lock() + s.revMu.RLock() + synced := startRev > s.store.currentRev || startRev == 0 if synced { - wa.minRev = s.store.currentRev.main + 1 + wa.minRev = s.store.currentRev + 1 if startRev > wa.minRev { wa.minRev = startRev } } - s.store.mu.Unlock() if synced { s.synced.add(wa) } else { slowWatcherGauge.Inc() s.unsynced.add(wa) } + s.revMu.RUnlock() + s.mu.Unlock() + watcherGauge.Inc() return wa, func() { s.cancelWatcher(wa) } @@ -263,12 +184,15 @@ func (s *watchableStore) syncWatchersLoop() { defer s.wg.Done() for { - s.mu.Lock() + s.mu.RLock() st := time.Now() lastUnsyncedWatchers := s.unsynced.size() - s.syncWatchers() - unsyncedWatchers := s.unsynced.size() - s.mu.Unlock() + s.mu.RUnlock() + + unsyncedWatchers := 0 + if lastUnsyncedWatchers > 0 { + unsyncedWatchers = s.syncWatchers() + } syncDuration := time.Since(st) waitDuration := 100 * time.Millisecond @@ -295,9 +219,9 @@ func (s *watchableStore) syncVictimsLoop() { for s.moveVictims() != 0 { // try to update all victim watchers } - s.mu.Lock() + s.mu.RLock() isEmpty := len(s.victims) == 0 - s.mu.Unlock() + s.mu.RUnlock() var tickc <-chan time.Time if !isEmpty { @@ -340,8 +264,8 @@ func (s *watchableStore) moveVictims() (moved int) { // assign completed victim watchers to unsync/sync s.mu.Lock() - s.store.mu.Lock() - curRev := s.store.currentRev.main + s.store.revMu.RLock() + curRev := s.store.currentRev for w, eb := range wb { if newVictim != nil && newVictim[w] != nil { // couldn't send watch response; stays victim @@ -358,7 +282,7 @@ func (s *watchableStore) moveVictims() (moved int) { s.synced.add(w) } } - s.store.mu.Unlock() + s.store.revMu.RUnlock() s.mu.Unlock() } @@ -376,19 +300,23 @@ func (s *watchableStore) moveVictims() (moved int) { // 2. iterate over the set to get the minimum revision and remove compacted watchers // 3. use minimum revision to get all key-value pairs and send those events to watchers // 4. remove synced watchers in set from unsynced group and move to synced group -func (s *watchableStore) syncWatchers() { +func (s *watchableStore) syncWatchers() int { + s.mu.Lock() + defer s.mu.Unlock() + if s.unsynced.size() == 0 { - return + return 0 } - s.store.mu.Lock() - defer s.store.mu.Unlock() + s.store.revMu.RLock() + defer s.store.revMu.RUnlock() // in order to find key-value pairs from unsynced watchers, we need to // find min revision index, and these revisions can be used to // query the backend store of key-value pairs - curRev := s.store.currentRev.main + curRev := s.store.currentRev compactionRev := s.store.compactMainRev + wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) minBytes, maxBytes := newRevBytes(), newRevBytes() revToBytes(revision{main: minRev}, minBytes) @@ -396,7 +324,7 @@ func (s *watchableStore) syncWatchers() { // UnsafeRange returns keys and values. And in boltdb, keys are revisions. // values are actual key-value pairs in backend. - tx := s.store.b.BatchTx() + tx := s.store.b.ReadTx() tx.Lock() revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) evs := kvsToEvents(wg, revs, vs) @@ -446,6 +374,8 @@ func (s *watchableStore) syncWatchers() { vsz += len(v) } slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) + + return s.unsynced.size() } // kvsToEvents gets all events for the watchers from all key-value pairs @@ -511,8 +441,8 @@ func (s *watchableStore) addVictim(victim watcherBatch) { func (s *watchableStore) rev() int64 { return s.store.Rev() } func (s *watchableStore) progress(w *watcher) { - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() if _, ok := s.synced.watchers[w]; ok { w.send(WatchResponse{WatchID: w.id, Revision: s.rev()}) diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index c6eedfe2040f..769d1bc38a83 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -57,11 +57,9 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - id := s.TxnBegin() - if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { - plog.Fatalf("txn put error: %v", err) - } - s.TxnEnd(id) + txn := s.Write() + txn.Put(keys[i], vals[i], lease.NoLease) + txn.End() } } diff --git a/mvcc/watchable_store_txn.go b/mvcc/watchable_store_txn.go new file mode 100644 index 000000000000..5c5bfda13413 --- /dev/null +++ b/mvcc/watchable_store_txn.go @@ -0,0 +1,53 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "github.com/coreos/etcd/mvcc/mvccpb" +) + +func (tw *watchableStoreTxnWrite) End() { + changes := tw.Changes() + if len(changes) == 0 { + tw.TxnWrite.End() + return + } + + rev := tw.Rev() + 1 + evs := make([]mvccpb.Event, len(changes)) + for i, change := range changes { + evs[i].Kv = &changes[i] + if change.CreateRevision == 0 { + evs[i].Type = mvccpb.DELETE + evs[i].Kv.ModRevision = rev + } else { + evs[i].Type = mvccpb.PUT + } + } + + // end write txn under watchable store lock so the updates are visible + // when asynchronous event posting checks the current store revision + tw.s.mu.Lock() + tw.s.notify(rev, evs) + tw.TxnWrite.End() + tw.s.mu.Unlock() +} + +type watchableStoreTxnWrite struct { + TxnWrite + s *watchableStore +} + +func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }