Skip to content

Commit

Permalink
mvcc: txns and r/w views
Browse files Browse the repository at this point in the history
Clean-up of the mvcc interfaces to use txn interfaces instead of an id.

Adds support for concurrent read-only mvcc transactions.

Fixes etcd-io#7083
  • Loading branch information
Anthony Romano committed Feb 15, 2017
1 parent b1641a9 commit 20763bb
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 352 deletions.
65 changes: 40 additions & 25 deletions mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ type RangeResult struct {
Count int
}

type KV interface {
// Rev returns the current revision of the KV.
Rev() int64

type ReadView interface {
// FirstRev returns the first revision of the KV.
// After a compaction, the first revision increases to the compaction
// revision.
FirstRev() int64

// Rev returns the revision of the KV at the time of the txn.
Rev() int64

// Range gets the keys in the range at rangeRev.
// The returned rev is the current revision of the KV when the operation is executed.
// If rangeRev <=0, range gets the keys at currentRev.
Expand All @@ -50,14 +50,17 @@ type KV interface {
// Limit limits the number of keys returned.
// If the required rev is compacted, ErrCompacted will be returned.
Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}

// Put puts the given key, value into the store. Put also takes additional argument lease to
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
// id.
// A put also increases the rev of the store, and generates one event in the event history.
// The returned rev is the current revision of the KV when the operation is executed.
Put(key, value []byte, lease lease.LeaseID) (rev int64)
// TxnRead represents a read-only transaction with operations that will not
// block other read transactions.
type TxnRead interface {
ReadView
// End marks the transaction is complete and ready to commit.
End()
}

type WriteView interface {
// DeleteRange deletes the given range from the store.
// A deleteRange increases the rev of the store if any key in the range exists.
// The number of key deleted will be returned.
Expand All @@ -67,26 +70,38 @@ type KV interface {
// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
DeleteRange(key, end []byte) (n, rev int64)

// TxnBegin begins a txn. Only Txn prefixed operation can be executed, others will be blocked
// until txn ends. Only one on-going txn is allowed.
// TxnBegin returns an int64 txn ID.
// All txn prefixed operations with same txn ID will be done with the same rev.
TxnBegin() int64
// TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
TxnEnd(txnID int64) error
// TxnRange returns the current revision of the KV when the operation is executed.
TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
// Put puts the given key, value into the store. Put also takes additional argument lease to
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
// id.
// A put also increases the rev of the store, and generates one event in the event history.
// The returned rev is the current revision of the KV when the operation is executed.
Put(key, value []byte, lease lease.LeaseID) (rev int64)
}

// TxnWrite represents a transaction that can modify the store.
type TxnWrite interface {
TxnRead
WriteView
}

// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)
type KV interface {
ReadView
WriteView

// Read creates a read transaction.
Read() TxnRead

// Write creates a write transaction.
Write() TxnWrite

// Hash retrieves the hash of KV state and revision.
// This method is designed for consistency checking purpose.
// This method is designed for consistency checking purposes.
Hash() (hash uint32, revision int64, err error)

// Commit commits txns into the underlying backend.
// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)

// Commit commits outstanding txns into the underlying backend.
Commit()

// Restore restores the KV store from a backend.
Expand Down
84 changes: 17 additions & 67 deletions mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,27 @@ var (
return kv.Range(key, end, ro)
}
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
id := kv.TxnBegin()
defer kv.TxnEnd(id)
return kv.TxnRange(id, key, end, ro)
txn := kv.Read()
defer txn.End()
return txn.Range(key, end, ro)
}

normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
return kv.Put(key, value, lease)
}
txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
id := kv.TxnBegin()
defer kv.TxnEnd(id)
rev, err := kv.TxnPut(id, key, value, lease)
if err != nil {
panic("txn put error")
}
return rev
txn := kv.Write()
defer txn.End()
return txn.Put(key, value, lease)
}

normalDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
return kv.DeleteRange(key, end)
}
txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
id := kv.TxnBegin()
defer kv.TxnEnd(id)
n, rev, err := kv.TxnDeleteRange(id, key, end)
if err != nil {
panic("txn delete error")
}
return n, rev
txn := kv.Write()
defer txn.End()
return txn.DeleteRange(key, end)
}
)

Expand Down Expand Up @@ -414,7 +406,7 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) {
func() { s.DeleteRange([]byte("foo"), nil) },
}
for i, tt := range tests {
id := s.TxnBegin()
txn := s.Write()
done := make(chan struct{}, 1)
go func() {
tt()
Expand All @@ -426,7 +418,7 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) {
case <-time.After(10 * time.Millisecond):
}

s.TxnEnd(id)
txn.End()
select {
case <-done:
case <-time.After(10 * time.Second):
Expand All @@ -438,62 +430,23 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) {
cleanup(s, b, tmpPath)
}

func TestKVTxnWrongID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)

id := s.TxnBegin()
wrongid := id + 1

tests := []func() error{
func() error {
_, err := s.TxnRange(wrongid, []byte("foo"), nil, RangeOptions{})
return err
},
func() error {
_, err := s.TxnPut(wrongid, []byte("foo"), nil, lease.NoLease)
return err
},
func() error {
_, _, err := s.TxnDeleteRange(wrongid, []byte("foo"), nil)
return err
},
func() error { return s.TxnEnd(wrongid) },
}
for i, tt := range tests {
err := tt()
if err != ErrTxnIDMismatch {
t.Fatalf("#%d: err = %+v, want %+v", i, err, ErrTxnIDMismatch)
}
}

err := s.TxnEnd(id)
if err != nil {
t.Fatalf("end err = %+v, want %+v", err, nil)
}
}

// test that txn range, put, delete on single key in sequence repeatedly works correctly.
func TestKVTxnOperationInSequence(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
id := s.TxnBegin()
txn := s.Write()
base := int64(i + 1)

// put foo
rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), lease.NoLease)
if err != nil {
t.Fatal(err)
}
rev := txn.Put([]byte("foo"), []byte("bar"), lease.NoLease)
if rev != base+1 {
t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
}

r, err := s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1})
r, err := txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1})
if err != nil {
t.Fatal(err)
}
Expand All @@ -508,15 +461,12 @@ func TestKVTxnOperationInSequence(t *testing.T) {
}

// delete foo
n, rev, err := s.TxnDeleteRange(id, []byte("foo"), nil)
if err != nil {
t.Fatal(err)
}
n, rev := txn.DeleteRange([]byte("foo"), nil)
if n != 1 || rev != base+1 {
t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1)
}

r, err = s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1})
r, err = txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1})
if err != nil {
t.Errorf("#%d: range error (%v)", i, err)
}
Expand All @@ -527,7 +477,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
t.Errorf("#%d: range rev = %d, want %d", i, r.Rev, base+1)
}

s.TxnEnd(id)
txn.End()
}
}

Expand Down
53 changes: 53 additions & 0 deletions mvcc/kv_view.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mvcc

import (
"github.com/coreos/etcd/lease"
)

type readView struct{ kv KV }

func (rv *readView) FirstRev() int64 {
tr := rv.kv.Read()
defer tr.End()
return tr.FirstRev()
}

func (rv *readView) Rev() int64 {
tr := rv.kv.Read()
defer tr.End()
return tr.Rev()
}

func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
tr := rv.kv.Read()
defer tr.End()
return tr.Range(key, end, ro)
}

type writeView struct{ kv KV }

func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
tw := wv.kv.Write()
defer tw.End()
return tw.DeleteRange(key, end)
}

func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw := wv.kv.Write()
defer tw.End()
return tw.Put(key, value, lease)
}
Loading

0 comments on commit 20763bb

Please sign in to comment.